#include #include "../shared/ContainerThread.h" #include "../shared/CommandDispatcher.h" #include "../shared/ThreadSafeWrapper.h" #include "../shared/ReplyToClient.h" #include "../shared/ThreadMonitor.h" #include "../shared/GlobalConfigFile.h" #include "../shared/LogFile.h" #include "../shared/DatabaseForThread.h" #include "../oddsmaker/UserInfo.h" #include "RecordDispatcher.h" #include "DelayedAlerts.h" #include "FieldLists.h" #include "Strategy.h" #include "../shared/NewWorkerCluster.h" #include "AlertMicroService.h" // TODO if there are no alerts, we should report something every few seconds. // Not every time, but not a long time between reports. // TODO The delayed alerts should work differently. We should remember the // (time,id) because that's the sort key. Both internally and when we send the // restart info to the user. namespace AlertMicroService { static const std::string s_mtAlert_inspect = "mtAlert inspect"; static const std::string s_mtAlert_add = "mtAlert add"; static const std::string s_mtAlert_clean = "mtAlert clean"; static const std::string s_updateStrategyList = "updateStrategyList"; static const std::string s_updateStrategyList_metadata = "updateStrategyList metadata"; static const std::string s_updateStrategyList_parse = "updateStrategyList parse"; static const std::string s_updateStrategyList_compile = "updateStrategyList compile"; static const std::string s_searchForAlerts = "searchForAlerts"; static const std::string s_searchForAlerts_match = "searchForAlerts match"; static const std::string s_searchForAlerts_columns = "searchForAlerts columns"; static const std::string s_searchForAlerts_user = "searchForAlerts user"; static const std::string s_searchForAlerts_strategy = "searchForAlerts strategy"; static const std::string s_searchForAlerts_alerts_seen = "searchForAlerts alerts seen"; static const std::string s_searchForAlerts_alerts_tested = "searchForAlerts alerts tested"; static const std::string s_searchForAlerts_alerts_matched = "searchForAlerts alerts matched"; static const std::string s_searchForAlerts_report = "searchForAlerts report"; static const std::string s_handleRequestInThread = "handleRequestInThread"; // When we start searching for alerts for a user, we compare the requested // wake up time to the actual time we are working. We record the difference // in microseconds. We add the time for every user we process. Divide this // by s_searchForAlerts_user to get the average late time. // // Note: The plan is to check each user's alerts 1 time per second. // However, we set userInfo->nextStartTime to the time when we finish a // request plus one second. So if we are delayed for any reason, we don't // try to make up that time in the future. static const std::string s_searchForAlerts_late = "searchForAlerts late µs"; // The quota is the amount of CPU we have divided by the number of customers. // If we make the quota a hard limit, everyone is guaranteed to get their // quota. But we usually have a lot of spare capacity, even at busy times. // So it may be okay if a few users go over their quota. // (quotaHardLimitFactor * quota) is where we will actually cut a user off. // Before we added quotaHardLimitFactor, the value was effectively 1.0. // (quotaWarnFactor * quota) is where we send something to the log but we // don't do anything else. This will help us understand the usage better. static double quotaHardLimitFactor; static double quotaWarnFactor; typedef std::vector< ExternalRequest * > RequestList; class AlertProvider : ForeverThreadUser { private: enum { mtAlert }; const uint64_t _historyCount; std::map< int64_t, Record::Ref > _alerts; bool _delayed; virtual void handleRequestInThread(Request *original) { ThreadMonitor::SetState tm(s_mtAlert_inspect); assert(original->callbackId == mtAlert); NewRecord *current = dynamic_cast< NewRecord * >(original); const ValueBox idBox = current->record->lookUpValue((FieldId)MainFields::id); bool valid; int64_t id; idBox.getInt(valid, id); if (!valid) return; // This next bit of code worked surprisingly well. It did not take any // significant CPU. It did make the logs a little messy. /* std::string alertType; current->record->lookUpValue((FieldId)MainFields::alert_type).getString(valid, alertType); if (valid) alertType = "〚" + alertType + "〛"; else // mtSendFakeRecord in RecordDispatcher.C causes us to see this // message. I've never found it anywhere else. alertType = "〚unknown〛"; ThreadMonitor::find().increment(alertType); */ tm.setState(s_mtAlert_add); _alerts[id] = current->record; tm.setState(s_mtAlert_clean); while (_alerts.size() > _historyCount) _alerts.erase(_alerts.begin()); } public: AlertProvider(IContainerThread *thread, bool delayed, uint64_t historyCount = 100000) : ForeverThreadUser(thread), _historyCount(historyCount), _delayed(delayed) { if (delayed) delayedAlertsListenForRecords(this, mtAlert); else IRecordDispatcher::getAlerts()->listenForRecords(this, mtAlert); start(); } std::map< int64_t, Record::Ref > const &getAlerts() { return _alerts; } }; typedef std::map< std::string, Parse::AlertStrategy > StrategyList; typedef TSRefCount< StrategyList > StrategyListRef; class UserInfo { public: typedef NCTSRefCount< UserInfo > Ref; SocketInfo *const socket; const UserInfoExport loginInfo; const ExternalRequest::MessageId messageId; // Only accessible from the dispatcher thread. RequestList dispatcherRequestList; // The database thread uses this list. The dispatcher moves the list from // dispatcherRequestList to here right before sending the request to the // database thread. RequestList databaseThreadRequestList; bool metadataRequestIsPending; StrategyListRef strategyList; TimeVal nextStartTime; // Only read and set in the alerts thread. bool firstRequest; int64_t nextAlertId; UserInfo(ExternalRequest *request) : socket(request->getSocketInfo()), loginInfo(userInfoGetInfo(socket)), messageId(request->getResponseMessageId()), metadataRequestIsPending(false), strategyList(new StrategyList), firstRequest(true), nextAlertId(strtolDefault(request->getProperty("next_id"), 0)) { } }; // Dispatcher class Main : public ForeverThreadUser, private ThreadMonitor::Extra { private: enum { mtStart, mtStop, mtListen }; static __thread AlertProvider *_delayedAlerts; static __thread AlertProvider *_liveAlerts; static __thread LateRecordCounter *_lateRecordCounter; NewWorkerCluster _databaseWorkers; NewWorkerCluster _alertWorkers; std::map< SocketInfo *, UserInfo::Ref > _userBySocket; // Contains users as they are waiting in the dispatcher. Users are removed // shortly before sending them to the alerts thread, and added back as // soon as they return. std::set< std::pair< TimeVal, UserInfo::Ref > > _userByTime; void addByTime(UserInfo::Ref const &info) { _userByTime.insert (std::pair< TimeVal, UserInfo::Ref >(info->nextStartTime, info)); } virtual void socketClosed(SocketInfo *socket) { UserInfo::Ref userInfo = getPropertyDefault(_userBySocket, socket); if (userInfo) { _userBySocket.erase(socket); _userByTime.erase(std::pair< TimeVal, UserInfo::Ref >(userInfo->nextStartTime, userInfo)); } } virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { if (!_userByTime.empty()) { TimeVal::Microseconds toSleep = _userByTime.begin()->first.asMicroseconds() - TimeVal(true).asMicroseconds(); if (toSleep < 0) toSleep = 0; callbacks.wakeAfterMs((toSleep + 999)/1000); } } virtual void awake(std::set< int > const &woken) { const TimeVal now(true); while ((!_userByTime.empty()) && (_userByTime.begin()->first <= now)) { UserInfo::Ref info = _userByTime.begin()->second; // If everyone uses their full quota, and we have no overhead, then // each user will get one update per second and the CPUs will be // running full time. const uint64_t quota = 1000000LL * _alertWorkers.workerCount() / _userBySocket.size(); _userByTime.erase(_userByTime.begin()); _alertWorkers.addJobPre([this, info] (StrategyListRef &strategyList) { // The "pre" step. This is called as soon as the job has made its // way to the front of the queue and a thread has been assigned to // it. This is called in the dispatcher thread. We copy the // strategyList at this time, right before we start working, // rather than copying it when the request was first made. It's // possible that the user added or deleted some windows between // the time when the job was queued up for work and now. strategyList = info->strategyList; // True means do NOT cancel this request. The default pre step // returns true and does nothing else. return true; }, [this, info, quota](StrategyListRef &strategyList) { // Search for alerts in the worker thread. searchForAlerts(info, quota, strategyList); }, [this,info](StrategyListRef &strategyList) { // Back in the dispatcher thread, store this in the timed list. // When the timer goes off in about one second, we'll schedule // another job to searchForAlerts() for this user again. addByTime(info); }, info->socket); } } void prepairForDatabase(UserInfo::Ref const &userInfo) { userInfo->databaseThreadRequestList.clear(); userInfo->databaseThreadRequestList .swap(userInfo->dispatcherRequestList); } // This is called in a database thread. void updateStrategyList(UserInfo::Ref const &userInfo, StrategyListRef &strategyList) { ThreadMonitor::SetState tm(s_updateStrategyList); UserId userId = userInfo->loginInfo.userId; //std::cout<<"updateStrategyList("<size()<<" entries"<databaseThreadRequestList.size())); for (ExternalRequest *current : userInfo->databaseThreadRequestList) { switch (current->callbackId) { case mtStart: { std::string collaborateString = current->getProperty("long_form"); //TclList msg; //msg<getSocketInfo()); if (current->getResponseMessageId().present()) { // Send metadata to client. tm.setState(s_updateStrategyList_metadata); AlertConfig config; XmlNode response; config.load(collaborateString, userId, database, /* allowCustomColumns = */ true, /* allowNonFilterColumns = */ true); if (current->getProperty("save_to_mru") == "1") { // The default is false. We wanted to hide some things from // the user. The new client will tell us wether or not // to hide this entry from the MRU list. If someone has // an old client and doesn't upgrade, but the server upgrade // is forced upon him, everything will be hidden. That seems // safe. (Originally we didn't send anything to the MRU list. // That was a mistake. But an old client won't be any worse // after this change than before. Either way the user gets no // new entries in his MRU list until he upgrades his client.) DatabaseWithRetry &master = *DatabaseForThread("@RW"); config.saveToMru(userId, master); } response["STATUS"].properties["SHORT_FORM"] = config.save(); std::string windowName = config.getWindowName(); if (!windowName.empty()) response["STATUS"].properties["WINDOW_NAME"] = windowName; PairedFilterList filters(userId, database, /* topList = */ false, /* onlyFilters = */ false); config.getInitialDescription(response["STATUS"], filters); addToOutputQueue(userInfo->socket, response.asString(), current->getResponseMessageId()); } tm.setState(s_updateStrategyList_parse); //std::cout<<"about to add new strategy"<getProperty("strategy_id")]; //TclList debugDump; strategy.load(collaborateString, userId, database, /* compileNow = */ false /*, &debugDump*/); //LogFile::primary().sendString(TclList()<getSocketInfo()); tm.setState(s_updateStrategyList); break; } case mtStop: //std::cout<<"deleting a strategy"<getProperty("strategy_id"), current->getSocketInfo()); newList->erase(current->getProperty("strategy_id")); break; default: abort(); } delete current; } //std::cout<<"finished adds and drops, about to make pointer list"< toCompile; toCompile.reserve(newList->size()); for (auto it = newList->begin(); it != newList->end(); it++) toCompile.push_back(&it->second); //std::cout<<"finished pointer list, about to compile"<nextStartTime.asMicroseconds(); tm.increment(s_searchForAlerts_late, late); //LogFile::primary().sendString(TclList()<nextAlertId<<"live alerts"<<_liveAlerts->getAlerts().size()<<"delayed alerts"<<_delayedAlerts->getAlerts().size(), userInfo->socket); struct StrategyWatcher { std::string name; Parse::AlertStrategy strategy; std::vector< Parse::ValuesByName > result; StrategyWatcher() {} StrategyWatcher(std::string const &name, Parse::AlertStrategy const &strategy) : name(name), strategy(strategy) { } }; // allStrategies contains the name of the strategy, the compiled code // for the strategy, and the results of the strategy. std::vector< StrategyWatcher > allStrategies; allStrategies.reserve(strategyList->size()); for (auto it = strategyList->cbegin(); it != strategyList->cend(); it++) allStrategies.emplace_back(it->first, it->second); // Don't add any more to allStrategies. We plan to save pointers into // that vector. std::vector< StrategyWatcher * > needsService; needsService.reserve(allStrategies.size()); for (auto it = allStrategies.begin(); it != allStrategies.end(); it++) needsService.push_back(&*it); tm.increment(s_searchForAlerts_strategy, needsService.size()); if (!needsService.empty()) CommandDispatcher::getInstance()->getDeadManTimer().touchConnection(userInfo->socket); //TclList msg; //msg<loginInfo.userId<<"status"<loginInfo.status; //if (userInfo->loginInfo.status==sFull) //msg<<"FULL"; //if (userInfo->loginInfo.status==sLimited) //msg<<"LIMITED"; //LogFile::primary().sendString(msg, userInfo->socket); const bool useLiveAlerts = userInfo->loginInfo.status==sFull; auto &alerts = (useLiveAlerts?_liveAlerts:_delayedAlerts)->getAlerts(); if (userInfo->nextAlertId > 0) { tm.setState(s_searchForAlerts); if ((!userInfo->firstRequest) && (!alerts.empty()) && (userInfo->nextAlertId < alerts.begin()->first)) { // Some alerts went in and out before this user was able to see them. // This is not based on the quota or the schedulers. These seem // to work well, so everyone gets called about once per second. // The problem is, our buffer is limited by the maximum number of // alerts, not by time. So if alerts come in too quickly, our // out buffer might not be sufficient. TclList msg; msg<nextAlertId"<nextAlertId <<"skipped"<<(alerts.begin()->first - userInfo->nextAlertId) <<"oldest"<first <<"newest"<first <<"count"<socket); // {Tue Aug 25 13:05:15 2020} 1 AlertMicroService.C 423 searchForAlerts {BUFFER TOO SMALL} userInfo->nextAlertId 26615408828 skipped 42 oldest 26615408870 newest 26615409027 count 158 // I found this in the log during testing. I restarted the server // in the normal way. Now I know that my client lost 42 alerts // because of that down time. // We were getting spammed with these messages so now I ignore the // user's first request for alerts. That's mildly interesting, but // I saw tons of people before 6am trying to request alerts from // the day before. Presumably they just restarted their laptop // and TI Pro diligently tried to pick up from where it left off // the day before. The goal of this test was to find times when the // user was connected, so this server was in full control of the // schedule, but we still missed alerts. This new test will remove // the sample shown above. } userInfo->firstRequest = false; int alertsSeen = 0; int alertsTested = 0; int alertsMatched = 0; for (auto it = alerts.rbegin(); (it != alerts.rend()) && (it->first >= userInfo->nextAlertId) && (!needsService.empty()); it++) { const TimeVal currentTime(true); if (stop < currentTime) { // This took too long. This seems to depend more on the user than // anything else. stoppedForQuota = true; break; } if (useLiveAlerts && !alertsSeen) // This is the first alert we're looking at and thus it is the // last alert we've received. Presumably this alert has the // latest timestamp; that's not 100% for sure, but that's our // estimate for now. Ignore alerts that were delayed on purpose!. _lateRecordCounter->add(it->second); alertsSeen++; alertsTested += needsService.size(); //LogFile::primary().sendString(TclList()<first, userInfo->socket); Execution::RecordInfo recordInfo; recordInfo.setRecord(it->second); allStrategies[0].strategy.init(recordInfo); for (size_t i = 0; i < needsService.size(); ) { StrategyWatcher &strategyWatcher = *needsService[i]; //tm.setState(s_searchForAlerts_match); // Very strange. I can call setState in evaluateWhere(). Then I // get about the same amount of time in evaluateWhere() as I get // in s_searchForAlerts_match. Somehow the two calls to setState() // and/or the act of calling // strategyWatcher.strategy.evaluateWhere() and storing the boolean // result take about the same amount of time. I disabled the // optimizer, because I thought that might be reordering things. // (It shouldn't be, because it can't see into the various function // calls.) That made no difference. I asked the compiler to // output assembly code. That looked just like it should. There // was nothing special going on between the call to setState(), // the call to evaluateWhere(), and the next call to setState(). const bool match = strategyWatcher.strategy.evaluateWhere(recordInfo); //tm.setState(s_searchForAlerts); bool deleteStrategy = false; if (match) { alertsMatched++; //tm.setState(s_searchForAlerts_columns); strategyWatcher.result.push_back(strategyWatcher.strategy.evaluateColumns(recordInfo)); //tm.setState(s_searchForAlerts); if (strategyWatcher.result.size() >= 35) // Done with this strategy. deleteStrategy = true; } if (deleteStrategy) { // Remove this strategy from needsService. The order of // needsService doesn't matter outside of this one loop. // Don't touch the strategies we just finished looking at. // Trade the current strategy with the one at the end of the // list. needsService[i] = needsService[needsService.size()-1]; needsService.resize(needsService.size()-1); } else // Go to the next strategy. i++; } } const TimeVal currentTime(true); if (stoppedForQuota || (warn < currentTime)) { TclList msg; msg<socket); // tm.increment("QUOTA " + ntoa(allStrategies.size())); } tm.increment(s_searchForAlerts_alerts_seen, alertsSeen); tm.increment(s_searchForAlerts_alerts_tested, alertsTested); tm.increment(s_searchForAlerts_alerts_matched, alertsMatched); } if (!alerts.empty()) userInfo->nextAlertId = std::max(userInfo->nextAlertId, alerts.rbegin()->first + 1); tm.setState(s_searchForAlerts_report); XmlNode response; XmlNode &responseData = response["DATA"]; if (userInfo->nextAlertId) response["RESTART"].properties["NEXT_ID"] = ntoa(userInfo->nextAlertId); for (auto strategyIt = allStrategies.cbegin(); strategyIt != allStrategies.end(); strategyIt++) { StrategyWatcher const &strategyWatcher = *strategyIt; if (!strategyWatcher.result.empty()) { XmlNode &strategyResponse = responseData[-1]; strategyResponse.name = "STRATEGY"; strategyResponse.properties["ID"] = strategyWatcher.name; for (auto rowIt = strategyWatcher.result.rbegin(), end = strategyWatcher.result.rend(); rowIt != end; rowIt++) { XmlNode &rowResponse = strategyResponse[-1]; Parse::ValuesByName const &rowResult = *rowIt; // TODO I copied this next loop from the top list part of // Strategy.C. It should be a library routine. for (auto fieldIt = rowResult.begin(); fieldIt != rowResult.end(); fieldIt++) fieldIt->second.writeToClient(rowResponse.properties, fieldIt->first); } } } addToOutputQueue(userInfo->socket, response.asString(), userInfo->messageId); TimeVal nextStartTime(true); nextStartTime.addSeconds(1); userInfo->nextStartTime = nextStartTime; } void sendMetadataRequestIfRequired(UserInfo::Ref userInfo) { if (userInfo->metadataRequestIsPending) // There is already an outstanding request. Don't try to send another. // The other thread will try to grab all the updates it can at once; // you don't need to call the other thread once for each user request. // More important, we don't want two different database threads working // on this at the same time. One could overwrite the work of the // other. We will check again when the current request finishes to // see if it missed something. return; if (userInfo->dispatcherRequestList.empty()) // No work to be done right now. return; userInfo->metadataRequestIsPending = true; _databaseWorkers.addJobPre([this, userInfo] (StrategyListRef &saved) { prepairForDatabase(userInfo); saved = userInfo->strategyList; return true; }, [this,userInfo](StrategyListRef &saved) { updateStrategyList(userInfo, saved); }, [this,userInfo](StrategyListRef &saved) { userInfo->metadataRequestIsPending = false; userInfo->strategyList = saved; // Check again. Make sure we didn't miss anything. sendMetadataRequestIfRequired(userInfo); }, userInfo->socket); } virtual void handleRequestInThread(Request *original) { ThreadMonitor::SetState tm(s_handleRequestInThread); switch (original->callbackId) { case mtStart: case mtStop: { //LogFile::primary().sendString(TclList()<getSocketInfo()); ExternalRequest *current = dynamic_cast< ExternalRequest * >(original); SocketInfo *socket = current->getSocketInfo(); UserInfo::Ref info = _userBySocket[socket]; if (info) { // We should always get the listen message first. If we do get a // start or stop message before the listen message, ignore the // start or stop message. info->dispatcherRequestList.push_back(current); sendMetadataRequestIfRequired(info); IContainerThreadUser::keepOriginal(); } break; } case mtListen: { ExternalRequest *current = dynamic_cast< ExternalRequest * >(original); SocketInfo *socket = current->getSocketInfo(); UserInfo::Ref &info = _userBySocket[socket]; //LogFile::primary().sendString(TclList()<getSocketInfo()); if (!info) { // We can only handle one of these messages. If the client sends // more, ignore all but the first. info = new UserInfo(current); TimeVal wakeTime(true); wakeTime.addSeconds(1); info->nextStartTime = wakeTime; addByTime(info); } break; } default: abort(); } } virtual void initializeInThread() { int threadCount = strtolDefault(getConfigItem("alert_micro_service_db"), 2); _databaseWorkers.createWorkers(threadCount); threadCount = strtolDefault(getConfigItem("alert_micro_service_worker"), 7); _alertWorkers.createWorkersLambda([]() { _delayedAlerts = new AlertProvider(IContainerThread::current(), true); _liveAlerts = new AlertProvider(IContainerThread::current(), false); _lateRecordCounter = new LateRecordCounter; ThreadMonitor::find().add(_lateRecordCounter); }, threadCount); ThreadMonitor::find().add(this); } virtual std::string getInfoForThreadMonitor() { TclList result; result<<"users currently on"<<_userBySocket.size(); return result; } public: Main() : ForeverThreadUser(IContainerThread::create("AlertMicroService")), _databaseWorkers(getContainer(), "AlertMicroService database"), _alertWorkers(getContainer(), "alert_micro_service_worker", 500) { //new AlertDelayCheck(getContainer()); CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand("ms_alert_start", this, mtStart); cd->listenForCommand("ms_alert_stop", this, mtStop); cd->listenForCommand("ms_alert_listen", this, mtListen); quotaHardLimitFactor = strtodDefault(getConfigItem("ms_alert_quota_hard"), 5.0); quotaWarnFactor = strtodDefault(getConfigItem("ms_alert_quota_warn"), 1.0); start(); } }; __thread AlertProvider *Main::_delayedAlerts; __thread AlertProvider *Main::_liveAlerts; __thread LateRecordCounter *Main::_lateRecordCounter; } void initAlertMicroService() { if (getConfigItem("alert_micro_service") == "1") new AlertMicroService::Main; }