#include "../shared/DatabaseWithRetry.h" #include "../shared/CommandDispatcher.h" #include "../shared/LogFile.h" #include "../shared/ReplyToClient.h" #include "../shared/GlobalConfigFile.h" #include "../ax_alert_server/Types.h" #include "../shared/MiscSQL.h" #include "../shared/ContainerThread.h" #include "Strategy.h" #include "RecordDispatcher.h" #include "TimedJobList.h" #include "DelayedAlerts.h" #include "TopListWorkers.h" // Test: ./fast_alert_search -i top_list_live_threads=2 -i top_list_delayed_threads=1 -i alerts_master_info=localhost:7897 -i top_list_master_info=localhost:7897 -i database_RO=drama -i database_RW=drama -i listen_port=7899 -i log_file_suffix=top_list -i auto_alerts_daily=1 >& /tmp/top_list_test.txt < /dev/null& // Test: ./fast_alert_search -i top_list_live_threads=2 -i top_list_delayed_threads=1 -i alerts_master_info=dom:7897 -i top_list_master_info=dom:7897 -i database_RO=crushinator -i database_RW=roberto -i listen_port=7899 -i log_file_suffix=top_list -i auto_alerts_daily=1 >& /tmp/top_list_test.txt < /dev/null& // Test: telnet localhost 7899 // Test: command=top_list_live_listen&message_id=28 // Test: command=top_list_live_start_streaming&socket_id=1234&window_id=TL_23&user_id=5&collaborate=xxx // Test: command=top_list_live_debug // Test: command=top_list_delayed_debug // Test: command=top_list_live_start_streaming&socket_id=1234&window_id=TL_24&user_id=5&collaborate=xxx // Test: command=top_list_live_start_streaming&socket_id=4321&window_id=TL_24&user_id=5&collaborate=xxx // Test: command=top_list_live_start_streaming&socket_id=4321&window_id=TL_24&user_id=5&collaborate=xxx // Test: command=top_list_live_stop_streaming&socket_id=4321&window_id=TL_24 // Test: command=top_list_live_stop_streaming&socket_id=4321&window_id=TL_24 // Test: command=top_list_live_stop_streaming&socket_id=1234&all=1 // Test: see also ../../live_server/fast_alert_search/run_top_list class TopListWorkThread : public ForeverThreadUser { public: class IDone { public: virtual void done(SocketInfo *socket, TopListWorkThread *which) =0; }; // This encapsulates the original request. The database is required to // initialize this, but never again. This gets passed from thread to // thread when it's time to do work. class WindowInfo { public: SocketInfo *const socket; const std::string remoteSocketId; const std::string windowId; const ExternalRequest::MessageId returnMessageId; const Parse::TopListStrategy strategy; WindowInfo(SocketInfo *socket, std::string const &remoteSocketId, std::string const &windowId, ExternalRequest::MessageId returnMessageId, Parse::TopListStrategy strategy) : socket(socket), remoteSocketId(remoteSocketId), windowId(windowId), returnMessageId(returnMessageId), strategy(strategy) { } // Currently this just finds the items that match a top list request. // But if we made this virtual, then we could have a lot of different // types of similar requests. See MarketSummaryRequest and // MatchingSymbolCountRequest in ../ax_alert_server/TopList.C. The bulk // of this unit would not have to change, only the code that creates // and initialized the WindowInfo object. void doWork(Parse::TopListStrategy::Records const &records) const; // We use smart pointers to make cleanup easier. In the alerts thread and // the WorkerCluster multiple threads will have a pointer to the same // request object. typedef SmarterCP< WindowInfo > Ref; }; class InitialState { public: typedef Parse::TopListStrategy::Records Records; private: static Records getRecentHistory(DatabaseWithRetry &database, std::string const &endTime); class Private { }; public: InitialState(Private const &) { } Records streaming; Records frozen; time_t lastStreaming; time_t frozenRecordsEndTime; std::string debugDump() const; typedef SmarterCP< InitialState > Ref; static Ref get(DatabaseWithRetry &database, bool delayed); }; private: enum { mtNewRecord, mtGetTopList }; class Work : public Request { public: WindowInfo::Ref windowInfo; Work(SocketInfo *socket) : Request(socket) { callbackId = mtGetTopList; } }; IDone *const _onDone; const bool _delayed; Parse::TopListStrategy::Records _records; Parse::TopListStrategy::Records _frozenRecords; time_t _previousTimeStamp; time_t _frozenRecordsEndTime; bool frozenRecordsAvailable() { return _frozenRecordsEndTime; } time_t closeForDate(time_t midnight) const; time_t getFrozenRecordsState(time_t time) const; void updateFrozenRecordsState(Record::Ref const &record); time_t _nextPurgeTime; void purgeOldRecords(); void checkForPurge(); public: // Queue this job up to be done in this thread. void doWork(WindowInfo::Ref work); TopListWorkThread(IDone *onDone, std::string const &baseName, InitialState::Ref initialState, bool delayed); virtual void handleRequestInThread(Request *original); }; class TopListJobControlThread : public ForeverThreadUser, private TopListWorkThread::IDone { private: enum { mtNewRequest, mtCancelRequest, mtWorkerDone, mtDebug }; std::queue< TopListWorkThread * > _availableWorkers; typedef std::map< TopListWorkThread *, SocketInfo * > InUse; InUse _inUse; // All jobs appear in _jobs. When we send a job // to a worker, we immediately delete the pair, update the timestamp for // next time, and then add the new pair. We could wait for the item to // finish before we add it back, but that would make the code more // complicated. TimedJobList< TopListWorkThread::WindowInfo::Ref > _jobs; void match(); class Done : public Request { public: TopListWorkThread *const which; Done(SocketInfo *socket, TopListWorkThread *which) : Request(socket), which(which) { callbackId = mtWorkerDone; } }; virtual void done(SocketInfo *socket, TopListWorkThread *which) { newRequest(new Done(socket, which)); } public: class Start : public Request { public: TopListWorkThread::WindowInfo::Ref windowInfo; Start(SocketInfo *socket) : Request(socket) { callbackId = mtNewRequest; } }; class Stop : public Request { public: std::string remoteSocketId; bool entireRemoteSocket; std::string windowId; Stop(SocketInfo *socket) : Request(socket) { callbackId = mtCancelRequest; } }; TopListJobControlThread(int threadCount, std::string const &baseName, TopListWorkThread::InitialState::Ref initialState, bool delayed); virtual void handleRequestInThread(Request *original); virtual void socketClosed(SocketInfo *socket); virtual void beforeSleep(IBeforeSleepCallbacks &callbacks); }; class TopListDatabaseThread : public ForeverThreadUser { private: enum { mtInit, mtNewRequest, mtCancelRequest, mtListen }; class InitRequest : public Request { public: int threadCount; std::string baseName; bool delayed; InitRequest() : Request(NULL) { } }; DatabaseWithRetry _database; TopListJobControlThread *_jobControlThread; std::map< SocketInfo *, ExternalRequest::MessageId > _returnMessageId; public: TopListDatabaseThread(int threadCount, std::string const &baseName, bool delayed); static void init(bool delayed); virtual void handleRequestInThread(Request *original); virtual void socketClosed(SocketInfo *socket); }; ///////////////////////////////////////////////////////////////////// // Global ///////////////////////////////////////////////////////////////////// void initTopListWorkers() { TopListDatabaseThread::init(false); TopListDatabaseThread::init(true); } static const std::string s_addRecord = "addRecord()"; static void addRecord(Parse::TopListStrategy::Records &destination, Record::Ref const &record) { ThreadMonitor::SetState state(s_addRecord); bool valid; std::string symbol; record->lookUpValue(MainFields::symbol).getString(valid, symbol); if (!valid) ThreadMonitor::find().increment("CAN'T FIND SYMBOL"); else { ThreadMonitor::find().increment(s_addRecord); destination[symbol] = record; } } static void addRecord(Parse::TopListStrategy::Records &destination, MysqlResultRef rows, std::vector< DatabaseFieldInfo > const &fields) { addRecord(destination, DatabaseFieldInfo::makeRecord(rows, fields)); } ///////////////////////////////////////////////////////////////////// // TopListDatabaseThread ///////////////////////////////////////////////////////////////////// TopListDatabaseThread::TopListDatabaseThread(int threadCount, std::string const &baseName, bool delayed) : ForeverThreadUser(IContainerThread::create(baseName + " Database")), _database(false, baseName), // Writable so we can update the MRU list. _jobControlThread(NULL) { InitRequest *request = new InitRequest; request->callbackId = mtInit; request->threadCount = threadCount; request->baseName = baseName; request->delayed = delayed; newRequest(request); CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand(baseName + "_start_streaming", this, mtNewRequest); cd->listenForCommand(baseName + "_stop_streaming", this, mtCancelRequest); cd->listenForCommand(baseName + "_listen", this, mtListen); start(); } void TopListDatabaseThread::init(bool delayed) { const std::string baseName = delayed?"top_list_delayed":"top_list_live"; const int threads = strtolDefault(getConfigItem(baseName + "_threads"), 0); if (threads > 0) new TopListDatabaseThread(threads, baseName, delayed); } void TopListDatabaseThread::socketClosed(SocketInfo *socket) { _returnMessageId.erase(socket); } void TopListDatabaseThread::handleRequestInThread(Request *original) { switch (original->callbackId) { case mtInit: { InitRequest *current = dynamic_cast< InitRequest * >(original); assert(!_jobControlThread); _jobControlThread = new TopListJobControlThread (current->threadCount, current->baseName, TopListWorkThread::InitialState::get(_database, current->delayed), current->delayed); break; } case mtNewRequest: { static const std::string s_NewTopListRequest = "New Top List Request"; ThreadMonitor::SetState state(s_NewTopListRequest); ThreadMonitor::find().increment(s_NewTopListRequest); ExternalRequest *current = dynamic_cast< ExternalRequest * > (original); SocketInfo *const socket = current->getSocketInfo(); const std::string remoteSocketId = current->getProperty("socket_id"); const std::string windowId = current->getProperty("window_id"); const ExternalRequest::MessageId returnMessageId = getPropertyDefault(_returnMessageId, socket); const UserId userId = strtolDefault(current->getProperty("user_id"), 0); const std::string collaborate = current->getProperty("collaborate"); const bool skipMetadata = current->getProperty("skip_metadata") == "1"; Parse::TopListStrategy strategy; strategy.load(collaborate, userId, _database); TopListWorkThread::WindowInfo::Ref windowInfo(NULL, socket, remoteSocketId, windowId, returnMessageId, strategy); if (skipMetadata) ThreadMonitor::find().increment("skip_metadata"); else { ThreadMonitor::find().increment("metadata"); // This seems a little silly. When we created the WindowInfo, that // had to create a Strategy, wich createded a TopListConfig. Ideally // We'd use that TopListConfig rather than creating another one here. // It's hard to reach through that many layers to get it. Maybe the // code needs a little cleanup. XmlNode message; XmlNode &group = message["TOPLIST"]; group.properties["TYPE"] = "info"; group.properties["WINDOW"] = windowId; TopListConfig config; // TODO // _allowNonFilterColumns(request->getProperty("non_filter_columns") == "1") // Currently this is hard coded to true. config.load(collaborate, userId, _database, true); config.getInitialDescription(group, userId, _database); if (current->getProperty("save_to_mru", "1") == "1") config.saveToMru(userId, _database); const std::string toSend = remoteSocketId + "\n" + message.asString(); addToOutputQueue(socket, toSend, returnMessageId); } TopListJobControlThread::Start *outgoing = new TopListJobControlThread::Start(socket); outgoing->windowInfo = windowInfo; _jobControlThread->newRequest(outgoing); break; } case mtCancelRequest: { ExternalRequest *current = dynamic_cast< ExternalRequest * > (original); TopListJobControlThread::Stop *outgoing = new TopListJobControlThread::Stop(current->getSocketInfo()); outgoing->remoteSocketId = current->getProperty("socket_id"); outgoing->entireRemoteSocket = current->getProperty("all") == "1"; if (!outgoing->entireRemoteSocket) outgoing->windowId = current->getProperty("window_id"); _jobControlThread->newRequest(outgoing); break; } case mtListen: { ExternalRequest *current = dynamic_cast< ExternalRequest * > (original); LogFile::primary().sendString(TclList()<getThreadName(), current->getSocketInfo()); _returnMessageId[current->getSocketInfo()] = current->getResponseMessageId(); break; } }; } ///////////////////////////////////////////////////////////////////// // TopListJobControlThread ///////////////////////////////////////////////////////////////////// TopListJobControlThread::TopListJobControlThread (int threadCount, std::string const &baseName, TopListWorkThread::InitialState::Ref initialState, bool delayed) : ForeverThreadUser(IContainerThread::create(baseName + " Job Control")) { LogFile::primary().sendString(TclList()<debugDump()); // This isn't a great place to listen for new commands. // TopListJobControlThread is not created in the main thread, so some // external commands could have already been thrown on the floor before we // said we were interested in listening. Since this is only a debug // command, it's probably okay. CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand(baseName + "_debug", this, mtDebug); for (int i = 0; i < threadCount; i++) { TopListWorkThread *thread = new TopListWorkThread(this, baseName, initialState, delayed); _availableWorkers.push(thread); } start(); } void TopListJobControlThread::handleRequestInThread(Request *original) { switch (original->callbackId) { case mtNewRequest: { Start *request = dynamic_cast< Start * >(original); // _jobs.erase(request->windowInfo); Redundant! _jobs.replace(request->windowInfo, 0); break; } case mtCancelRequest: { Stop *request = dynamic_cast< Stop * >(original); if (request->entireRemoteSocket) _jobs.erase(request->getSocketInfo(), request->remoteSocketId); else _jobs.erase(request->getSocketInfo(), request->remoteSocketId, request->windowId); break; } case mtWorkerDone: { Done *request = dynamic_cast< Done * >(original); _availableWorkers.push(request->which); _inUse.erase(request->which); break; } case mtDebug: { TclList info; info<<"_availableWorkers.size()"<<_availableWorkers.size(); TclList sockets; for (InUse::const_iterator it = _inUse.begin(); it != _inUse.end(); it++) sockets<second); info<<"_inUse"< (original); addToOutputQueue(current->getSocketInfo(), _jobs.debugDump() + (std::string)info, current->getResponseMessageId()); break; } } } void TopListJobControlThread::socketClosed(SocketInfo *socket) { TclList msg; msg<getThreadName()"<getThreadName() <<"getSerialNumber(socket)"<second == socket) _availableWorkers.push(it->first); else stillInUse[it->first] = it->second; } _inUse = stillInUse; // Handle jobs _jobs.erase(socket); msg<<"after" <<"_inUse.size()"<<_inUse.size() <<"_availableWorkers.size()"<<_availableWorkers.size() <<"_jobs.sleepTimeMs()"<<_jobs.sleepTimeMs(); LogFile::primary().sendString(msg); } void TopListJobControlThread::match() { ThreadMonitor &tm = ThreadMonitor::find(); while (true) { if (_availableWorkers.empty()) { if (_jobs.aJobIsReady()) //LogFile::primary().sendString(TclList()<socket; worker->doWork(job); _availableWorkers.pop(); _jobs.replace(job, 30); } } void TopListJobControlThread::beforeSleep(IBeforeSleepCallbacks &callbacks) { match(); if (!_availableWorkers.empty()) callbacks.wakeAfterMs(_jobs.sleepTimeMs()); } ///////////////////////////////////////////////////////////////////// // TopListWorkThread::WindowInfo ///////////////////////////////////////////////////////////////////// void TopListWorkThread::WindowInfo::doWork(Parse::TopListStrategy::Records const &records) const { ThreadMonitor::find().increment("doWork()"); // replaced by TopListWorkThread::handleRequestInThread mtGetTopList assert(false); //strategy->run( // TODO // const std::string debugMsg = //"Remote Socket Id=" + remoteSocketId + "\n" // + ""; //addToOutputQueue(socket, debugMsg, returnMessageId); } ///////////////////////////////////////////////////////////////////// // TopListWorkThread::InitialState ///////////////////////////////////////////////////////////////////// TopListWorkThread::InitialState::Records TopListWorkThread::InitialState::getRecentHistory(DatabaseWithRetry &database, std::string const &endTime) { Parse::TopListStrategy::Records result; std::vector< DatabaseFieldInfo > const &alertFields = DatabaseFieldInfo::getAlertFields(); std::vector< std::string > sql; sql.push_back("SELECT @max_real_time:=timestamp FROM alerts WHERE timestamp <= '" + endTime + "' ORDER BY timestamp DESC LIMIT 1"); sql.push_back("SELECT @min_real_time:=timestamp FROM alerts WHERE timestamp >= '" + endTime + "' - INTERVAL 299 SECOND ORDER BY timestamp ASC LIMIT 1"); sql.push_back("SELECT @max_id := MAX(id) FROM alerts WHERE timestamp = @max_real_time"); sql.push_back("SELECT @min_id := MIN(id) FROM alerts WHERE timestamp = @min_real_time"); sql.push_back("SELECT " + DatabaseFieldInfo::makeSqlFieldList(alertFields) + " FROM alerts " "WHERE id between @min_id and @max_id " "AND timestamp between @min_real_time and @max_real_time"); LogFile::primary().sendString(TclList()<rowIsValid(); rows->nextRow()) addRecord(result, rows, alertFields); return result; } std::string TopListWorkThread::InitialState::debugDump() const { TclList result; result<<"streaming count"< result(NULL, Private()); std::vector< std::string > sql; sql.push_back("SELECT @max_id := MAX(id) FROM alerts WHERE alert_type='HB'"); sql.push_back("SELECT @max_time := timestamp FROM alerts WHERE id = @max_id"); const int mainIndex = sql.size(); sql.push_back("SELECT MAX(timestamp) AS max_time, " "UNIX_TIMESTAMP(MAX(timestamp)) AS time_t, " "@todays_date := DATE(MAX(timestamp)) AS todays_date, " "TIME(MAX(timestamp)) >= '" // These constants also live in // TopListWorkThread::getFrozenRecordsState(). They should // be named rather than copied. TODO + secondsToMysql(MarketHours::close()+(delayed?300:90)) + "' AS end_of_day, " "TIME(MAX(timestamp)) < '" + secondsToMysql(MarketHours::open()+(delayed?300:30)) + "' AS previous_day " "FROM alerts " "WHERE timestamp < @max_time + INTERVAL 0 SECOND " "AND timestamp < NOW()" + (delayed?" - INTERVAL 15 MINUTE":"")); const int endOfDayIndex = sql.size(); sql.push_back("SELECT MAX(timestamp) AS max_time " "FROM alerts " "WHERE timestamp <= @todays_date + INTERVAL " + ntoa(MarketHours::close()+299) + " SECOND"); const int previousDayIndex = sql.size(); sql.push_back("SELECT MAX(date) + INTERVAL " + ntoa(MarketHours::close()+299) + " SECOND " "FROM alerts_daily " "WHERE date < @todays_date"); LogFile::primary().sendString(TclList()< fromSql = database.tryAllUntilSuccess(sql.begin(), sql.end()); const bool endOfDay = fromSql[mainIndex]->getBooleanField("end_of_day"); const bool previousDay = fromSql[mainIndex]->getBooleanField("previous_day"); std::string endTime ; if (endOfDay) { endTime = fromSql[endOfDayIndex]->getStringField("max_time"); result->frozen = getRecentHistory(database, endTime); } else if (previousDay) { endTime = fromSql[previousDayIndex]->getStringField(0); result->frozen = getRecentHistory(database, endTime); } result->frozenRecordsEndTime = mysqlToTimeT(endTime); if (delayed) { result->streaming = getRecentHistory(database, fromSql[mainIndex]->getStringField("max_time")); } else { std::vector< DatabaseFieldInfo > const &topListFields = DatabaseFieldInfo::getTopListFields(); std::string sql = "SELECT " + DatabaseFieldInfo::makeSqlFieldList(topListFields) + " FROM top_list WHERE timestamp > NOW() - INTERVAL 24 HOUR"; for (MysqlResultRef rows = database.tryQueryUntilSuccess(sql); rows->rowIsValid(); rows->nextRow()) addRecord(result->streaming, rows, topListFields); } // It's important to look at the time of the last streaming event. // TopListWorkThread keeps track of this for live streaming data so it // won't get confused by an event that is out of order. We need to do the // same thing with the initial data. It's very likely that, after loading // this initial data, the thread will receive some duplicated records from // the streaming data. What happens if we start this thread shortly after // the market closed. The thread will start correctly, from the initial // data, then it will see old data that might make it think that the market // is open again! result->lastStreaming = fromSql[mainIndex]->getIntegerField("time_t", 0); return result; } ///////////////////////////////////////////////////////////////////// // TopListWorkThread ///////////////////////////////////////////////////////////////////// void TopListWorkThread::checkForPurge() { const time_t now = time(NULL); if (now > _nextPurgeTime) purgeOldRecords(); } void TopListWorkThread::purgeOldRecords() { ThreadMonitor::SetState state("purgeOldRecords"); const size_t before = _records.size(); const time_t cutoff = time(NULL) - (_delayed?80:60) * MARKET_HOURS_MINUTE; auto it = _records.begin(); while (it != _records.end()) { bool valid; time_t timestamp; it->second->lookUpValue(MainFields::timestamp).getInt(valid, timestamp); if ((!valid) || (timestamp < cutoff)) it = _records.erase(it); else it++; } LogFile::primary().sendString(TclList()<getThreadName()); _nextPurgeTime = time(NULL) + 150; } time_t TopListWorkThread::closeForDate(time_t midnight) const { if (_delayed) return midnight + MARKET_HOURS_CLOSE + 5 * MARKET_HOURS_MINUTE; else return midnight + MARKET_HOURS_CLOSE + 90; } void TopListWorkThread::updateFrozenRecordsState(Record::Ref const &record) { bool valid; int64_t timestamp; record->lookUpValue(MainFields::timestamp).getInt(valid, timestamp); if (!valid) { // This shouldn't happen. ThreadMonitor::find().increment("INVALID TIMESTAMP"); return; } if (timestamp <= _previousTimeStamp) { // If there's a late print, ignore it. If the time is the same as the // last time we processed, nothing interesting could happen, so as an // optimization, ignore it. return; } const time_t now = time(NULL); if (timestamp > now) { // A bad timestamp could cripple us for a long time. Possibly forever. LogFile::primary().sendString(TclList()<= closeCutOff) newFrozenRecordsEndTime = closeCutOff - 1; else { const time_t openCutOff = _delayed ?(date + MARKET_HOURS_OPEN + 5 * MARKET_HOURS_MINUTE) :(date + MARKET_HOURS_OPEN + 30); if (timestamp < openCutOff) { if (_frozenRecordsEndTime) // Already frozen. Still frozen. Assume the old time is correct. newFrozenRecordsEndTime = _frozenRecordsEndTime; else // It's morning and we just now froze the records. This is unlikely. // This means that we didn't get any post market prints yesterday. // Look at the time of the previous event to see the last trading // day. newFrozenRecordsEndTime = closeForDate(midnight(_previousTimeStamp)); } } // TODO What about weekends? These should be treated like an after market // hours print, or just completely thrown away. Not that important, but one // bad print could cause a lot of confusion. _previousTimeStamp = timestamp; const bool newValue = newFrozenRecordsEndTime; if (frozenRecordsAvailable() == newValue) // No change. return; _frozenRecordsEndTime = newFrozenRecordsEndTime; if (frozenRecordsAvailable()) { // Freeze it now. // tempting to do a purge first. _frozenRecords = _records; LogFile::primary().sendString(TclList()<socket); request->windowInfo = work; newRequest(request); } TopListWorkThread::TopListWorkThread(IDone *onDone, std::string const &baseName, InitialState::Ref initialState, bool delayed) : ForeverThreadUser(IContainerThread::create(baseName + " Worker", true, 250)), _onDone(onDone), _delayed(delayed), _records(initialState->streaming), _frozenRecords(initialState->frozen), _previousTimeStamp(initialState->lastStreaming), _frozenRecordsEndTime(initialState->frozenRecordsEndTime), _nextPurgeTime(0) { if (_delayed) delayedAlertsListenForRecords(this, mtNewRecord); else IRecordDispatcher::getTopList()->listenForRecords(this, mtNewRecord); TclList msg; msg<callbackId) { case mtNewRecord: { ThreadMonitor::SetState state("mtNewRecord"); NewRecord *request = dynamic_cast< NewRecord * >(original); // TODO delete old records. updateFrozenRecordsState(request->record); addRecord(_records, request->record); break; } case mtGetTopList: { checkForPurge(); // LogFile::primary().sendString(TclList()<(original); //LogFile::primary().sendString(TclList()<getSocketInfo()); ThreadMonitor::find().increment("mtGetTopList"); // Ideally the work would all be done in the doWork() virtual function. // That might require some changes to some interfaces. My initial plan // had an interface that was too simple. //work->windowInfo->doWork(_records); const int seconds = _delayed?300:30; // TODO seconds is not always right. If we loaded from the database, // regardless of delayed or not delayed, the time should be 300. Parse::TopListStrategy::Result result; if (frozenRecordsAvailable()) result = work->windowInfo->strategy.run(_records, _previousTimeStamp, _frozenRecords, _frozenRecordsEndTime, seconds); else result = work->windowInfo->strategy.run(_records, _previousTimeStamp, seconds); XmlNode message; result.addAsChild(message, work->windowInfo->windowId); const std::string toSend = work->windowInfo->remoteSocketId + "\n" + message.asString(); addToOutputQueue(work->getSocketInfo(), toSend, work->windowInfo->returnMessageId); _onDone->done(original->getSocketInfo(), this); break; } } } /* ax_alert_server Receives a top list request, breaks it open. 1) Historical request -- processed as it always has been. 2) Live (possibly frozen) -- goes to the top list server. 3) Delayed but not historical (possibly frozen) -- goes to the alert server. If it sends the request off to another server, ax_alert_server is all but done with it When messages come back, it needs to remember which client and which message id. On the new server: One thread for incoming requests Every request to say start listening or done/delete. Access to the database One thread organizing the continuous work. Gets start and stop requests from the database thread. Hand off jobs to the worker threads. Owns N other threads. Inspired by the WorkerCluster class. Keeps a list of all top lists windows, including the time until they next need service. N threads doing the continuous work. Each has it's own copy of the top list data. Each has a copy of the frozen top list data. Each only knows about the current request and forgets that instantly. This object knows how to satisfy an individual top list request. That code is based heavily on the top list test code. The threads above are wrapped in an object. Two instances of that object. On for delayed and one for live. */