#include #include #include "../shared/GlobalConfigFile.h" #include "../shared/NewWorkerCluster.h" #include "../shared/ContainerThread.h" #include "../shared/CommandDispatcher.h" #include "../shared/ReplyToClient.h" #include "../shared/LogFile.h" #include "../shared/DatabaseForThread.h" #include "../shared/MiscSQL.h" #include "../shared/Random.h" #include "../oddsmaker/UserInfo.h" #include "../ax_alert_server/FormatTime.h" #include "Strategy.h" #include "RecordDispatcher.h" #include "DelayedAlerts.h" #include "UserFilters.h" #include "TopListMicroService.h" // TODO Clean up the fast queue. If someone asks for data about a single stock, and that stock doesn't exist, we have no way to remove the request, even when the user hangs up. /* This file provides data from the top list table. * * This should eventually replace TopListWorkers.C. Initially it will do that * for new clients which speak a new API. Eventually we should change * ax_alert_server so it sends requests to this unit, rather than * TopListWorkers.C. * * TopListWorkers.C was our first attempt at making a dedicated server that * didn't talk directly to the client. The micro services proxy is a better * way to to do that. For people using older clients we should be able to * make ax_alert_server talk to this unit more or less the same way that the * micro proxy does. In particular, we should have one socket per client * even if that means that two servers have a lot of different sockets * connecting them. That way we can do a better job keeping one user with * a lot of requests from slowing down a lot of other users. * * Originally this data was used to populate a top list window. Now people * have found other ways to use this table. In particular, someone will ask * for information about a particular stock. The database version could use * an index to jump directly to that stock. Originally the C++ libraries would * go through the entire list of stocks. We recently fixed that in * Strategy.[Ch]. (Search those files for "singleSymbol" for details.) Now * a single request for data about a single stock is much faster. However, * TopListWorkers.C still groups all requests into one queue. So these fast * requests wait in line behind slow ones. * * This file uses a more complicated queue structure so (a) one user can't * slow down another user and (b) a user's big/slow/difficult requests don't * slow down the user's fast/small/simple requests. * * There is a third way that we set priorities. If a user makes a new request, * we try to get to that as quickly as possible. If a user has a streaming * request, only the first response gets that higher priority. If a user * has data in a window, and it updates every 30 seconds instead of ever 15 * seconds, not many people would notice. If a user has no data in a window, * and it takes 150 milliseconds to display the initial data instead of 75 * milliseconds, that's very obvious. * * We organize the users' requests into two types of queues. (a) Some queues * hold data until some external event says we're ready. E.g. wait for a * timer to go off or wait until we receive new market data. (b) Some queues * hold data until we have resources available. These requests are ready now * and should be handled ASAP. We have four types of threads. (1) One single * dispatcher is responsible for managing the various queues and does almost * nothing else. (2) One group of threads handles database requests. (3) * One group of threads handles fast requests that use a lot of memory and CPU. * (4) One group of threads handles slower requests that use a lot of memory * and CPU. For the most part we use the same queues and threads for both * real-time and delayed requests. * * (a) is handled by TimerList and WaitingBySymbol. As soon as an appropriate * event occurs requests are sent to the queues described in (b). If a user * requests several top lists at once, it's likely that every 30 seconds we'll * move all of that user's requests from (a) to (b) at the same time. * * (b) starts with a normal FairRequestQueue, which is built into a * NewWorkerCluser. Like a normal FairRequestQueue we interleave requests from * different users. But normally a FairRequestQueue looks like a FIFO from * the perspective of a single user. This file uses a PriorityQueue, instead, * to let each user set the priorities of his own requests. E.g. fill a new * window before updating a window that already has data. * * (1) is very straightforward. It owns three NewWorkerCluster objects. * It listens to market data just to know when certain requests should wake * up and be sent to a worker cluster. It doesn't store the market data. * * (2) is aimed at the initial configuration request. Traditionally this * was a complicated process. The new API has more options, and sometimes * we know in advance that a new request doesn't need the database. In those * cases the initial configuration is handled by (3) or (4) instead. * * (3) is aimed at single symbol requests. "Fast" and "single symbol" are * mostly interchangeable in this file. This can also handle some * configuration requests. (3) contains two DataProvider objects, one for live * data and one for delayed data. Each of these contains two lists, one for * data which was frozen at the close and one for data which is never frozen. * * (4) is aimed at traditional requests which scan the entire universe of * stocks. However it contains the exact same resources at (3). (3) is like * the "10 items or less lane" at the supermarket. */ /* Test code: * command=ms_top_list_start&sort_formula=[Price]&message_id=17 * command=ms_top_list_start&sort_formula=2*3&message_id=18&name=XXX * command=ms_top_list_start&sort_formula=[Price]&message_id=18&name=YYY&outside_market_hours=0 * command=ms_top_list_start&sort_formula=[Price]&message_id=28&name=YYY&outside_market_hours=0&extra_column_formulas=symbol%3D%5BD_Symbol%5D%26price%3D%5BPrice%5D%26half_price%3D%5BPrice%5D%2F2&result_count=10&streaming=1 * command=ms_top_list_start&sort_formula=[Price]&message_id=29&name=ZZZ&outside_market_hours=0&extra_column_formulas=symbol%3D%5BD_Symbol%5D%26price%3D%5BPrice%5D%26half_price%3D%5BPrice%5D%2F2&single_symbol=MSFT&streaming=1 * * command=ms_top_list_stop&name=ZZZ * command=ms_top_list_stop&name=YYY * * telnet becca 7936 * command=proxy_login&user_id=4 * * The "extra_column_formulas" input is a map. It is stored like a URL. * php > echo rawurlencode("symbol=[D_Symbol]&price=[Price]&half_price=[Price]/2"); * symbol%3D%5BD_Symbol%5D%26price%3D%5BPrice%5D%26half_price%3D%5BPrice%5D%2F2 * * Also look at TopListRequest.cs */ // Schedule a purge for 12:30 am each morning. This many seconds after // midnight. static const int PURGE_TIME_SECONDS = 30 * MARKET_HOURS_MINUTE; namespace TopListMicroService { ///////////////////////////////////////////////////////////////////// // Queue // // What type of worker does this request need? ///////////////////////////////////////////////////////////////////// enum class Queue : char { Database, Fast, Normal, Done }; TclList &operator <<(TclList &list, Queue queue) { switch (queue) { case Queue::Database: list<<"Database"; break; case Queue::Fast: list<<"Fast"; break; case Queue::Normal: list<<"Normal"; break; case Queue::Done: list<<"Done"; break; default: list<<("unknown("+ntoa((int)queue)+")"); break; } return list; } ///////////////////////////////////////////////////////////////////// // Nullable Boolean. ///////////////////////////////////////////////////////////////////// enum class NBool : char { Unknown, False, True }; NBool getNBool(ExternalRequest *request, std::string const &name) { const std::string asString = request->getProperty(name); if (asString == "0") return NBool::False; if (asString == "1") return NBool::True; return NBool::Unknown; } TclList &operator <<(TclList &list, NBool value) { switch (value) { case NBool::True: list<<"NBool::True"; break; case NBool::False: list<<"NBool::False"; break; default: list<<"NBool::Unknown"; break; } return list; } ///////////////////////////////////////////////////////////////////// // Config ///////////////////////////////////////////////////////////////////// typedef Parse::StrategyTrees StrategyTrees; typedef Parse::Tree Tree; typedef Parse::ITopListConfig ITopListConfig; typedef Parse::ITopListConfigFromCollaborate ITopListConfigFromCollaborate; typedef Parse::ReplaceRule ReplaceRule; typedef Parse::UserFiltersRule UserFiltersRule; class Config : public ITopListConfig { public: int _count; std::string _singleSymbol; StrategyTrees _strategyTrees; bool _outsideMarketHours; virtual int getCount() const { return _count; } virtual std::string getSingleSymbol() const { return _singleSymbol; } virtual StrategyTrees getStrategyTrees() const { return _strategyTrees; } virtual bool outsideMarketHours() const { return _outsideMarketHours; } }; ///////////////////////////////////////////////////////////////////// // Records // // This is the current state of all stocks. This is a standard format // used by multiple libraries. ///////////////////////////////////////////////////////////////////// typedef Parse::TopListStrategyBase::Records Records; ///////////////////////////////////////////////////////////////////// // DataProvider // // This maintains a list of current top list data. When someone queries // the top list, start from here. // // The logic to maintain the current data was pulled from the // TopListWorkThread class in TopListWorkers.C. TopListWorkThread had // additional logic. DataProvider only provides data, e.g. the current // price of MSFT is 52.72 and the current volume for ORCL is 1,000,982 // shares. The caller should know what to do with that data. // // A DataProvider object is not thread safe. If you need one, create it // in the thread where you need it. If you have a cluster, create one // in each thread. // // We're actually using two of these objects per thread. One for live // data and one for delayed data. That decision is based on // AlertMicroService.C, which works well and had a similar structure. ///////////////////////////////////////////////////////////////////// class DataProvider : public ForeverThreadUser, private ThreadMonitor::Extra { public: class InitialState { private: static Records getRecentHistory(DatabaseWithRetry &database, std::string const &endTime); //InitialState() { } public: Records streaming; Records frozen; time_t lastStreaming; time_t frozenRecordsEndTime; std::set< time_t > tradingDays; std::string debugDump() const; typedef SmarterCP< InitialState > Ref; static Ref get(DatabaseWithRetry &database, bool delayed); // Make the initial records for the day. For example, if the end time // is 12:30 am or 11:59 pm on July 20th, look at the alerts_daily table // for July 20th to get the closing price of each stock from the previous // day. We'll use that as the initial price, until we get a new top list // record giving us newer data. // // If we already have data for a stock in records, don't overwrite it. // I.e. call getDailyHistory() AFTER calling getRecentHistory(). // // date should be in mysql format. If you provide a date and time, // the time will automatically be truncated. static void getDailyHistory(DatabaseWithRetry &database, Records &records, std::string const &date); static void getDailyHistory(DatabaseWithRetry &database, Records &records, time_t date); }; private: enum { mtNewRecord }; static __thread DataProvider *_instance[2]; static DataProvider *&instance(bool delayed) { return _instance[delayed]; } static void addRecord(Records &destination, Record::Ref const &record); static void addRecord(Records &destination, MysqlResultRef rows, std::vector< DatabaseFieldInfo > const &fields); const bool _delayed; Records _records; Records _frozenRecords; time_t _previousTimeStamp; time_t _frozenRecordsEndTime; bool frozenRecordsAvailable() { return _frozenRecordsEndTime; } time_t closeForDate(time_t midnight) const; time_t getFrozenRecordsState(time_t time) const; void updateFrozenRecordsState(Record::Ref const &record); std::set< time_t > _tradingDays; // From ThreadMonitor::Extra virtual std::string getInfoForThreadMonitor(); DataProvider(IContainerThread *containerThread, InitialState::Ref const &initialState, bool delayed); protected: // From ForeverThreadUser virtual void handleRequestInThread(Request *original); virtual void initializeInThread(); public: Records const &getRecords() const { return _records; } Records const &getFrozenRecords() const { return _records; } Parse::TopListStrategy::Result getTopList(Parse::TopListStrategy const &strategy); static void overnightPurge(SmarterP< Records > const &records, time_t time); static void init(InitialState::Ref const &initialState, bool delayed); static DataProvider *getInstance(bool delayed) { return instance(delayed); } }; static const std::string s_addRecord = "addRecord()"; __thread DataProvider *DataProvider::_instance[2]; Parse::TopListStrategy::Result DataProvider::getTopList(Parse::TopListStrategy const &strategy) { ThreadMonitor::SetState state("getTopList()"); state.increment("getTopList()"); const int seconds = _delayed?300:30; if (frozenRecordsAvailable()) return strategy.run(_records, _previousTimeStamp, _frozenRecords, _frozenRecordsEndTime, seconds); else return strategy.run(_records, _previousTimeStamp, seconds); } void DataProvider::initializeInThread() { ThreadMonitor::find().add(this); } std::string DataProvider::getInfoForThreadMonitor() { TclList result; result<<"DataProvider" <<(_delayed?"delayed":"live") <<"_records"<<_records.size() <<"_frozenRecords"<<_frozenRecords.size() <<"_previousTimeStamp"< const &records, time_t time) { for (int i = 0; i < 2; i++) { _instance[i]->_records = *records; _instance[i]->_previousTimeStamp = time; } } void DataProvider::init(InitialState::Ref const &initialState, bool delayed) { DataProvider *&i = instance(delayed); assert(!i /* Duplicate request. */); IContainerThread *currentThread = IContainerThread::current(); assert(currentThread /* Must initialize in a container thread. */); i = new DataProvider(currentThread, initialState, delayed); } void DataProvider::addRecord(Records &destination, Record::Ref const &record) { ThreadMonitor::SetState state(s_addRecord); bool valid; std::string symbol; record->lookUpValue(MainFields::symbol).getString(valid, symbol); if (!valid) ThreadMonitor::find().increment("CAN'T FIND SYMBOL"); else { ThreadMonitor::find().increment(s_addRecord); destination[symbol] = record; } } void DataProvider::addRecord(Records &destination, MysqlResultRef rows, std::vector< DatabaseFieldInfo > const &fields) { addRecord(destination, DatabaseFieldInfo::makeRecord(rows, fields)); } Records DataProvider::InitialState::getRecentHistory(DatabaseWithRetry &database, std::string const &endTime) { StopWatch stopWatch; Records result; std::vector< DatabaseFieldInfo > const &alertFields = DatabaseFieldInfo::getAlertFields(); std::vector< std::string > sql; sql.push_back("SELECT @min_id := id,timestamp FROM alerts WHERE timestamp > DATE('" + endTime + "') ORDER BY timestamp ASC LIMIT 1"); sql.push_back("SELECT @max_id := id from alerts where timestamp < '" + endTime + "' ORDER BY timestamp DESC LIMIT 1"); sql.push_back("SELECT " + DatabaseFieldInfo::makeSqlFieldList(alertFields) + " FROM (SELECT MAX(id) AS INNER_ID FROM alerts WHERE id between @min_id and @max_id GROUP BY symbol) as MY_SUB,alerts WHERE id=INNER_ID"); auto overheadTime = stopWatch.getMicroSeconds(); MysqlResultRef rows = database.tryAllUntilSuccess(sql.begin(), sql.end())[2]; const auto databaseTime = stopWatch.getMicroSeconds(); for (; rows->rowIsValid(); rows->nextRow()) addRecord(result, rows, alertFields); overheadTime += stopWatch.getMicroSeconds(); TclList msg; msg<numRows() <<"symbol count"< sql; // Look for an appropriate date. If the input is a Saturday or Sunday, we // automatically move forward to Monday. There is no need to consider // holidays here. sql.push_back("SELECT UNIX_TIMESTAMP(MIN(date)) as time_t, @date := MIN(date) from alerts_daily WHERE date >= DATE('" + mysqlEscapeString(date) + "')"); sql.push_back("SELECT d_symbol,last_price FROM alerts_daily WHERE date = @date"); std::vector< MysqlResultRef > results = database.tryAllUntilSuccess(sql.begin(), sql.end()); const time_t finalDate = results[0]->getIntegerField("time_t", 0); for (MysqlResultRef result = results[1]; result->rowIsValid(); result->nextRow()) { const std::string symbol = result->getStringField(0); if (records.count(symbol)) continue; const double price = result->getDoubleField(1, std::numeric_limits< double >::quiet_NaN()); RecordBuilder rb; // You need the symbol and the timestamp to look up the alerts_daily // data. That's most of what we're offering right now. rb.append(MainFields::symbol, symbol); rb.append(MainFields::timestamp, finalDate); // Start from yesterday's close. That seems reasonable. And that will // make a lot of other filters available. Many filters use alerts_daily // and price or last. rb.append(MainFields::price, price); rb.append(MainFields::last, price); const std::string message = rb.exportAsString(); const RecordRef record = Record::create(message); records[symbol] = record; } TclList msg; msg< result(NULL); std::vector< std::string > sql; sql.push_back("SELECT @max_id := MAX(id) FROM alerts WHERE alert_type='HB'"); sql.push_back("SELECT @max_time := timestamp FROM alerts WHERE id = @max_id"); const int mainIndex = sql.size(); sql.push_back("SELECT MAX(timestamp) AS max_time, " "UNIX_TIMESTAMP(MAX(timestamp)) AS time_t, " "@todays_date := DATE(MAX(timestamp)) AS todays_date, " "TIME(MAX(timestamp)) >= '" // These constants also live in // DataProvider::getFrozenRecordsState(). They should // be named rather than copied. TODO + secondsToMysql(MarketHours::close()+(delayed?300:90)) + "' AS end_of_day, " "TIME(MAX(timestamp)) < '" + secondsToMysql(MarketHours::open()+(delayed?300:30)) + "' AS previous_day " "FROM alerts " "WHERE timestamp < @max_time + INTERVAL 0 SECOND " "AND timestamp < NOW()" + (delayed?" - INTERVAL 15 MINUTE":"")); const int isTradingDayIndex = sql.size(); //sql.push_back("SELECT COUNT(*) FROM holidays_test " sql.push_back("SELECT COUNT(*) FROM holidays " "WHERE day=@todays_date AND FIND_IN_SET('US', closed) = 0"); const int endOfDayIndex = sql.size(); sql.push_back("SELECT MAX(timestamp) AS max_time " "FROM alerts " "WHERE timestamp <= @todays_date + INTERVAL " + ntoa(MarketHours::close()+299) + " SECOND"); const int previousDayIndex = sql.size(); sql.push_back("SELECT MAX(day) + INTERVAL " + ntoa(MarketHours::close()+299) + " SECOND " //"FROM holidays_test " "FROM holidays " "WHERE day < @todays_date " "AND FIND_IN_SET('US', closed) = 0"); //LogFile::primary().sendString(TclList()< fromSql = database.tryAllUntilSuccess(sql.begin(), sql.end()); // The preferred end time is past the close today. For example, we're // looking at live data and the last record is from 4:30pm today. The // market closed before this record was written. This record is based on // post market data. If someone doesn't want post market data, we'll // probably freeze the data from right around the close today. const bool endOfDay = fromSql[mainIndex]->getBooleanField("end_of_day"); // The preferred end time is in the pre market. For example, we're // looking at live data and the last record is from 6:25 this morning. The // market hasn't opened yet today. If someone doesn't want pre market // data, we'll go back to yesterday (or the previous trading day) around // the close. const bool previousDay = fromSql[mainIndex]->getBooleanField("previous_day"); // The preferred end time is a holiday. Mostly we avoid weekends because // we're looking for a heartbeat alert. But these can appear on holidays // sometimes. We treat !isTradingDay more or less like previousDay const bool isTradingDay = fromSql[isTradingDayIndex]->getBooleanField(0); // It's important to look at the time of the last streaming event. // DataProvider keeps track of this for live streaming data so it // won't get confused by an event that is out of order. We need to do the // same thing with the initial data. It's very likely that, after loading // this initial data, the thread will receive some duplicated records from // the streaming data. What happens if we start this thread shortly after // the market closed. The thread will start correctly, from the initial // data, then it will see old data that might make it think that the market // is open again! result->lastStreaming = fromSql[mainIndex]->getIntegerField("time_t", 0); // If we had been running for a long time, what would have been the time // of the most recent purge? Ideally at the end of the initialization // the results should be exactly the same as if we'd been running for a // long time. (Or as close as possible.) (In other words, you shouldn't // be afraid to restart this server at any time. Data will not be available // while we're initializing, but everything should be fine after that.) // So we need to know if the last purge would have happened before or after // the data we're thinking about pulling out of the database. const time_t purgeTime = midnight(time(NULL)) + PURGE_TIME_SECONDS; // Prepare the frozen at close data. { // The time to use if the user doesn't want pre and post market data. // Used to request the actual top list data from the database. And // reported to the user. Blank if the preferred end time is during // market hours. In that case the user sees the same did regardless of // the pre/post market setting. std::string frozenEndTime; if (previousDay || !isTradingDay) { // The time of the close and the date of the most recent trading day // before the day of the our preferred end time. frozenEndTime = fromSql[previousDayIndex]->getStringField(0); result->frozen = getRecentHistory(database, frozenEndTime); } else if (endOfDay) { frozenEndTime = fromSql[endOfDayIndex]->getStringField("max_time"); result->frozen = getRecentHistory(database, frozenEndTime); } // frozenRecordsEndTime will be 0 if there are no frozen records. result->frozenRecordsEndTime = mysqlToTimeT(frozenEndTime); } // Prepair the live-always data. if (purgeTime >= result->lastStreaming) { // The last scheduled purge was after the last top list record in the // database. So we don't grab those records. Jump directly to the new // records that we create each morning. getDailyHistory(database, result->streaming, purgeTime); result->lastStreaming = purgeTime; } else { // Grab the most recent live top list records from the database. // Create our own records only if we can't find a recent top list // record for a given stock. if (delayed) { result->streaming = getRecentHistory(database, fromSql[mainIndex]->getStringField("max_time")); } else { std::vector< DatabaseFieldInfo > const &topListFields = DatabaseFieldInfo::getTopListFields(); std::string sql = "SELECT " + DatabaseFieldInfo::makeSqlFieldList(topListFields) + " FROM top_list WHERE timestamp > CURDATE()"; for (MysqlResultRef rows = database.tryQueryUntilSuccess(sql); rows->rowIsValid(); rows->nextRow()) addRecord(result->streaming, rows, topListFields); getDailyHistory(database, result->streaming, fromSql[mainIndex]->getStringField("max_time")); } } const std::string holidaySql = "SELECT UNIX_TIMESTAMP(day) " //"FROM holidays_test " "FROM holidays " "WHERE FIND_IN_SET('US', closed) = 0 AND day > NOW() - INTERVAL 1 WEEK"; for (MysqlResultRef rows = database.tryQueryUntilSuccess(holidaySql); rows->rowIsValid(); rows->nextRow()) result->tradingDays.insert(rows->getIntegerField(0, -1)); return result; } time_t DataProvider::closeForDate(time_t midnight) const { if (_delayed) return midnight + MARKET_HOURS_CLOSE + 5 * MARKET_HOURS_MINUTE; else return midnight + MARKET_HOURS_CLOSE + 90; } void DataProvider::updateFrozenRecordsState(Record::Ref const &record) { bool valid; int64_t timestamp; record->lookUpValue(MainFields::timestamp).getInt(valid, timestamp); if (!valid) { // This shouldn't happen. ThreadMonitor::find().increment("INVALID TIMESTAMP"); return; } if (timestamp <= _previousTimeStamp) { // If there's a late print, ignore it. If the time is the same as the // last time we processed, nothing interesting could happen, so as an // optimization, ignore it. return; } const time_t now = time(NULL); if (timestamp > now) { // A bad timestamp could cripple us for a long time. Possibly forever. if (timestamp - now <= 3) // I was seeing a lot of these that were off by one second. We should // try to do better. But still, there's no reason to spam the logs. ThreadMonitor::find().increment("INVALID TIMESTAMP FUTURE " + ntoa(timestamp - now)); else // For a bigger problem send more details to the log. LogFile::primary().sendString(TclList()<::iterator dateIt = _tradingDays.lower_bound(originalDate); const time_t date = (dateIt == _tradingDays.end()) // We are past the last day in the holidays table. That's probably // a maintenance issue. Someone forgot to update the table. Naively // assume that the event happened on a trading day, not a weekend or // holiday. That's not always correct, but it's a reasonable assumption. // We always assumed that before we started checking the holidays table. ?originalDate // Else, use the date we found in our list of valid trading days. :(*dateIt); const time_t closeCutOff = closeForDate(date); if (timestamp >= closeCutOff) newFrozenRecordsEndTime = closeCutOff - 1; else { const time_t openCutOff = _delayed ?(date + MARKET_HOURS_OPEN + 5 * MARKET_HOURS_MINUTE) :(date + MARKET_HOURS_OPEN + 30); if (timestamp < openCutOff) { // Premarket. Or a holiday. If today's a holiday, treat the event // like the premarket for the next trading day. Show it to people who // ask for pre and post market day, but don't clear the frozen records. if (_frozenRecordsEndTime) // Already frozen. Still frozen. Assume the old time is correct. newFrozenRecordsEndTime = _frozenRecordsEndTime; else // It's morning and we just now froze the records. This is unlikely. // This means that we didn't get any post market prints yesterday. // Look at the time of the previous event to see the last trading // day. newFrozenRecordsEndTime = closeForDate(midnight(_previousTimeStamp)); } } _previousTimeStamp = timestamp; const bool newValue = newFrozenRecordsEndTime; if (frozenRecordsAvailable() == newValue) // No change. return; _frozenRecordsEndTime = newFrozenRecordsEndTime; if (frozenRecordsAvailable()) { // Freeze it now. _frozenRecords = _records; LogFile::primary().sendString(TclList()<streaming), _frozenRecords(initialState->frozen), _previousTimeStamp(initialState->lastStreaming), _frozenRecordsEndTime(initialState->frozenRecordsEndTime), _tradingDays(initialState->tradingDays) { if (_delayed) delayedAlertsListenForRecords(this, mtNewRecord); else IRecordDispatcher::getTopList()->listenForRecords(this, mtNewRecord); TclList msg; msg<callbackId) { case mtNewRecord: { ThreadMonitor::SetState state("mtNewRecord"); NewRecord *request = dynamic_cast< NewRecord * >(original); // TODO delete old records. updateFrozenRecordsState(request->record); addRecord(_records, request->record); break; } } } ///////////////////////////////////////////////////////////////////// // TopListRequest ///////////////////////////////////////////////////////////////////// class TopListRequest { private: SocketInfo *const _socket; const int64_t _socketSerialNumber; const ExternalRequest::MessageId _messageId; const bool _delayed; // Mostly used to cancel the request. The client makes up the names. const std::string _name; // What's the current state? What kind of resources do we need for our // next time slice. Queue _queue; // Before sending this object to a worker queue we copy the value of // _queue into _queueInWorker. The worker thread only reads and writes // _queueInWorker, never _queue. The dispatcher thread could modify // _queue while the worker thread is working. As soon as the job returns // from the worker we merge _queue with _queueInWorker and store the // result in _queue. Queue _queueInWorker; // Filled in by a worker thread on our first time slice. Filled in based // on a number of inputs from the original request. Parse::TopListStrategy _parsedStrategy; // The original collaborate string from the client. This is optional. // This is one of the inputs used to load _parsedStrategy. std::string _collaborate; // This is the time of the last update. Normally we wait a specific amount // of time after this to run again. Divide this by 1,000,000 to get time_t // format. // // Use 0 if you want the highest priority. // // In general, if we get behind, we keep each user's requests in order. // One user is explicitly allowed to cut in line in front of another user, // like in a FairRequestQueue queue. But if one user sends two requests, and // request #1 should fire in 5 seconds, and request #2 should fire in 10 // seconds, you expect request #1 to fire before request #2. The times // might not be exact, especially if we're busy, but the order will // probably remain in tact. We have multiple worker threads, so it's // possible that we'll start #1 first, but #2 will finish first. // // This replaces _hasBeenSeen. Set this to 0 to say _hasBeenSeen = false. // Set this to the time we last saw the data to say _hasBeenSeen = true. // This serves the same purpose as _hasBeenSeen but its more precise. int64_t _lastUpdateMicroTime; // Use this to convert from the input format into the internal format. // Force the value into a reasonable range. // // Make this a static function so you can use it in the top of the // constructor. static int64_t initLastUpdate(std::string const &input) { const time_t asTimeT = importTime(input); if (asTimeT <= 0) // For now we disallow any negative numbers. Call that reserved. // 0 is the default value. It's legal and presumably common. return 0; // Convert time_t (i.e. seconds) to microseconds. static const int64_t CONVERSION_FACTOR = 1000000; // The input is measured in seconds, i.e. time_t. But we store it in // microseconds. If the input is greater than the current time we want // to set it to the current time. The problem is that we might get a // numeric overflow in the conversion before we do the comparison. In // that case the comparison will give you a somewhat random result. const int64_t now = getMicroTime(); if ((now / CONVERSION_FACTOR) < asTimeT) // The time is in the future. Assume that is an error. Say that the // last update was now. That's the closest legal value to the value // requested by the customer. If we normally update every N seconds, // this requests that we do the first update N seconds from now. return now; // The input was in the legal range. Now convert seconds to // microseconds. return asTimeT * CONVERSION_FACTOR; } // This does not depend on what the client told us. This is always // initialized to false, then set to true the first time we send results. // This has nothing to do with meta data. bool _weveSentRowData; // Traditionally each time we parse the collaborate string we send a // meta data message back to the client. That might not always be // required. // // The TopListWorkers API had this field. But it was only used when the // servers were talking with each other. The client never saw this. It // was used to do a better job simulating the original API which was meant // for a single server to do everything. // // If you get reconnected, should you ask for new meta data? It probably // doesn't hurt. This option was really set up for clients who never need // the meta data. Each time you reload the config things might change. // The server might have added a new filter. More likely the user might // have changed a custom filter. // // If I ask for meta data every time, so that mean the meta data and the // row data match? Not necessarily. Originally that was true. Once // we switched to the non-database way of getting row data, there was no // certain guarantee that the two would match perfectly. We make the // requests at about the same time, but there are two different requests // and they are not in a transaction. So listing to all meta data messages // will help but it's not 100%. bool _skipMetaData; // Traditionally, initializing a top list request always requires access // to a database. Among other things, the user might have custom filters. // Sometimes you don't need all of that, and you don't want to wait behind // other requests that do need that. Some requests are so simple that // we don't need the database, the the normal database overhead would be // large compared to the time to perform the rest of the request. Also, // if you disable the database then all users start to look the same and // the server might cache some of the compilation. bool _useDatabase; // False for a one time request, true for a streaming request. // Traditionally a history request is not streaming but a live request is. // That's based on a standard top list window. Now sometimes people ask // for data like the exchange or company name associated with a symbol, // and nothing else. (The client already got a lot of data from somewhere // else and we are just filling in a few details.) These fields don't // change, the client will send a non-streaming request. const bool _streaming; // Should we save the strategy to the mru list. This only applies to the // normal top list collaborate string. If that string is blank or missing, // it doesn't make sense to save anything. This does not make any attempt // to save the other parts of this request. bool _saveToMru; // Does the client want to use the columns from the collaborate string? // If the client didn't specify a strategy, do you want to use the default // columns? Remember that if you don't specify any columns in the // strategy, you will get some defaults. bool _collaborateColumns; // True means that we treat all times the times, so you can see data // outside market hours. False means show live data during market hours, // and freeze things during the close at other times. Unknown means we're // not sure. Try to find this value from somewhere else. NBool _outsideMarketHours; // We have loaded the strategy. This often has to happen in a worker // thread. We don't do this in the constructor because it might take too // long. bool _strategyLoaded; // How many results we want. Must be at least one. If you provide a // strategy, the default is to copy the result count from the strategy. // Otherwise we'll fill in some reasonable value, like 100. int _resultCount; // The client can easily specify a single symbol in the API. This should // be smart enough to automatically detect a strategy which is associated // with a single symbol. We can process single symbol requests more // efficiently than other requests. This will be "" if we are not // restricted to a single symbol. std::string _singleSymbol; // This is ignored for a single symbol request. If this is blank, use the // sort field from the strategy. (If _sortFormula and _collaborate are both // specified, _sortFormula takes precedence.) "1" is the preferred way to // say that we don't care about sorting. This might help us optimize later // on. Don't forget that a sort formula can contribute to the where // condition. If the sort formula (from here or the strategy) is null for // a stock, we don't include that stock in the result. Sometimes its hard // to pick a standard filter that is never null. "1", of course, will // never be null. // // Currently this only sorts by numeric fields. If the field returns a // string or NULL for a row, that row will be excluded. This is based on // the way the top list config window worked. It would not be that hard // to change if we wanted to sort by strings. // // This always returns the smallest value first. In the top list config // window the user had a sort field and a choice of smallest or largest // first. If you use this and you want the largest value first, try adding // a minus sign to your formula. Under the hood that's how the traditional // top list window works. std::string _sortFormula; // If this is blank, use the where formula from the collaborate string. // If both are specified use this one. (It might be nice to have an // option to reference the where from the collaborate string from this // formula.) std::string _whereFormula; // A map from the formula name to the source code for the formula. The // names are assigned by the client. The name should be a valid XML // property name. The server doesn't alter the name at all. The client // might ask for something well known, like "SYMBOL" or "c_D_symbol", // trying to match names that we typically send another way. std::map< std::string, std::string > _extraColumnFormulas; Tree compile(std::string const &source, Tree const &defaultValue, ReplaceRule const *additionalRules = NULL); void loadConfig(); void run(); public: int64_t getSocketSerialNumber() const { return _socketSerialNumber; } std::string const &getName() const { return _name; } Queue getQueue() const { return _queue; } bool isDone() const { return _queue == Queue::Done; } void cancel() { _queue = Queue::Done; } int64_t getLastUpdateMicroTime() const { return _lastUpdateMicroTime; } std::string getSingleSymbol() const { return _singleSymbol; } bool isDelayed() const { return _delayed; } bool weveSentRowData() const { return _weveSentRowData; } void beforeWorkerThread(); void inWorkerThread(); void afterWorkerThread(); Queue getWorkQueue() { return _singleSymbol.empty()?Queue::Normal:Queue::Fast; } std::string debugDump() const; TopListRequest(ExternalRequest *externalRequest); typedef SmarterP< TopListRequest > Ref; }; Tree TopListRequest::compile(std::string const &source, Tree const &defaultValue, ReplaceRule const *additionalRules) { try { // TODO cache this. But only when additionalRules is NULL. Tree tree = Parse::Parser(source).parse(); if (additionalRules) tree = Parse::replace(tree, *additionalRules); tree = Parse::replace(tree, *Parse::CommonRules::instance().publicResources()); tree->assertDone(); //TclList msg; //msg<shortDebug(); //LogFile::primary().sendString(msg, _socket); return tree; } catch (Parse::Exception &ex) { // We get a lot of these. These seem to be from a client bug. //TclList msg; //msg<getInitialDescription(group); addToOutputQueue(_socket, message.asString(), _messageId); } Config config; if (_resultCount > 0) config._count = _resultCount; else if (base) config._count = base->getCount(); else config._count = 100; if (_singleSymbol.empty() && base) _singleSymbol = base->getSingleSymbol(); config._singleSymbol = _singleSymbol; if (_outsideMarketHours == NBool::True) config._outsideMarketHours = true; else if (_outsideMarketHours == NBool::False) config._outsideMarketHours = false; else { if (base) config._outsideMarketHours = base->outsideMarketHours(); else config._outsideMarketHours = true; _outsideMarketHours = config._outsideMarketHours?NBool::True:NBool::False; } StrategyTrees baseTrees; if (base) baseTrees = base->getStrategyTrees(); ReplaceRule *customFormulas = _useDatabase?(new UserFiltersRule(userId)):NULL; Tree sort; if (!_sortFormula.empty()) sort = compile(_sortFormula, NULL, customFormulas); else if (base) sort = baseTrees.getSort(); if (!sort) sort = Parse::IntTreeNode::TRUE; //TclList msg; //msg<shortDebug(); //LogFile::primary().sendString(msg, _socket); Tree where; if (!_whereFormula.empty()) where = compile(_whereFormula, NULL, customFormulas); else if (base) where = baseTrees.getWhere(); if (!where) where = Parse::IntTreeNode::TRUE; if (!_sortFormula.empty()) // We always get rid of items where the sort value is not defined. // We do this in part because the sort routine built into C++ can't // handle a NaN. If we skipped this step this program might crash // later. // // Note: What if the user starts from a base strategy, adds his own sort // by, but does not add his own where? In that case the base strategy // will automatically add something like this for the base sort field. // In that case we should REPLACE that part of the where condition, // instead of just adding to the where condition. TODO // // Note this is a big deal because the traditional way of of selecting // a sort key is flawed. It's hard to pick something that's never null. // In fact, that was one of the biggest issues in the old API which lead // to this new API. where = new Parse::AndFunction({where, new Parse::ValidDouble(sort)}); // The following should not be required. If the strategy has a single // symbol, TopListStrategy will use an index to find that symbol and it // will ignore the rest of the where condition. // if (!config._singleSymbol.empty()) // where = new AndFunction({where, // new ComparisonFunction(ComparisonFunction::Equal, // PrimaryField::SYMBOL, // new StringTreeNode(config._singleSymbol))}); // Note that we don't try to remove the symbol specification from the base // where condition. If you plan to add your own symbol later, the base // strategy should start with symbol list = none. //TclList msg1; //msg1<second, NULL, customFormulas)) columns[it->first] = column; //TclList msg2; //msg2<getSocketInfo()), _socketSerialNumber(SocketInfo::getSerialNumber(_socket)), _messageId(externalRequest->getResponseMessageId()), _delayed(userInfoGetInfo(_socket).status == sLimited), _name(externalRequest->getProperty("name")), _queue(Queue::Done), _collaborate(externalRequest->getProperty("collaborate")), _lastUpdateMicroTime(initLastUpdate(externalRequest ->getProperty("last_update"))), _weveSentRowData(false), _skipMetaData(externalRequest->getProperty("skip_metadata", false)), _useDatabase(_collaborate.empty() ?externalRequest->getProperty("use_database", false) :true), _streaming(externalRequest->getProperty("streaming", false)), _saveToMru(_collaborate.empty() ?false :externalRequest->getProperty("save_to_mru", true)), _collaborateColumns(externalRequest->getProperty("collaborate_columns", !_collaborate.empty())), _outsideMarketHours(getNBool(externalRequest, "outside_market_hours")), _strategyLoaded(false), _resultCount(strtolDefault(externalRequest->getProperty("result_count"), 0)), _singleSymbol(externalRequest->getProperty("single_symbol")), _sortFormula(externalRequest->getProperty("sort_formula", _collaborate.empty()?"1":"")), _whereFormula(externalRequest->getProperty("where_formula", _collaborate.empty()?"1":"")) { parseUrlRequest(_extraColumnFormulas, externalRequest->getProperty("extra_column_formulas")); if (_useDatabase) _queue = Queue::Database; else _queue = getWorkQueue(); } ///////////////////////////////////////////////////////////////////// // PriorityQueue // // Most of the time all requests go into the same FairRequestQueue. // That will keep one user from monopolizing the resources. But for // a given user the requests are process in FIFO order. // // This takes a different approach. We'll still use a // FairRequestQueue to keep the users from fighting. But each user // can have at most one entry in the FairRequestQueue. If the user // wants to do multiple things, we will keep all of those things // in a PriorityQueue. When the FairRequestQueue says it's time for // the user to do one thing, we check this PriorityQueue to find that // one thing. // // The idea is that a user might have a lot of TopListRequest objects // that all need service. On the client side the top lists are all // separate objects, and the software doesn't care what order we // respond in. So we have some freedom. // // If a top list updates every 20 or 30 seconds, rather than every // 15, most users wouldn't know the different. However, if a new // top list is blank for 0.2 seconds rather than 0.1 seconds, that's // noticeable. And, of course, if a new top list is blank for 15 // seconds, that's a huge problem. So we let each user set his own // priorities. If the server is running slowly we might have several // items that are all ready when it's our turn. So why not start // with the request that the customer is most likely to notice. // // Currently the priority is only based on two things. The primary // sort key is the value of _lastUpdateMicroTime. If two items have // the same value for _lastUpdateMicroTime we use FIFO. ///////////////////////////////////////////////////////////////////// class PriorityQueue { private: // We could just store the TopListRequest::Ref object because it // contains the time value. By using a multimap like this, we make a // copy of the time value. This is slightly more robust; if someone // changed the time value while the object was in the set, that // could cause problems. That could even cause a core dump. We // don't expect anyone to do that, but this is an extra safety measure. // Note: Starting with C++11 multimaps take care of the sorting exactly // as described above. Previous versions of C++ did not specify what // happened if two items have the same key. std::multimap< int64_t, TopListRequest::Ref > _queue; public: bool empty() const { return _queue.empty(); } void push(TopListRequest::Ref const &request); // This will return NULL if the queue is empty. // // The result might be a request which has already been canceled. // previous versions of this class deleted canceled requests for us. // That was redundant. TopListRequest::Ref pop(); }; inline TopListRequest::Ref PriorityQueue::pop() { if (_queue.empty()) return TopListRequest::Ref(); const auto it = _queue.begin(); TopListRequest::Ref result = it->second; _queue.erase(it); return result; } inline void PriorityQueue::push(TopListRequest::Ref const &request) { _queue.emplace(request->getLastUpdateMicroTime(), request); } ///////////////////////////////////////////////////////////////////// // Resources // // This class is primarily aimed at preventing circular dependencies // between the User and Main classes. Main will try to send every // request to a User object and let the User object do the bulk of // the work. But the Main object owns some resources, like the three // thread pools, and User objects will need these to do their work. ///////////////////////////////////////////////////////////////////// class Resources : public ForeverThreadUser { private: // For simplicity we assume there's exactly one of these. static Resources *_instance; protected: Resources(); public: NewWorkerCluster databaseJobs; NewWorkerCluster fastJobs; NewWorkerCluster slowJobs; NewWorkerCluster &get(Queue which); // Go to sleep for an appropriate amount of time then go back to the user. // Then tell the user to scheduleNow(). The request should be in the // Fast or Normal state before calling scheduleSoon(). It is possible that // the request will be canceled and/or the user will disconnect while the // request is waiting in this queue. Nothing else can happen to the // request until this queue gives it up. virtual void scheduleSoon(TopListRequest::Ref request) =0; static Resources &getInstance() { return *_instance; } }; Resources *Resources::_instance = NULL; Resources::Resources() : ForeverThreadUser(IContainerThread::create("TopListMicroService")), databaseJobs(getContainer(), "TopListMicroService database"), fastJobs(getContainer(), "TopListMicroService fast", 250), slowJobs(getContainer(), "TopListMicroService slow", 500) { assert(!_instance); _instance = this; } NewWorkerCluster &Resources::get(Queue which) { switch (which) { case Queue::Database: return databaseJobs; case Queue::Fast: return fastJobs; case Queue::Normal: return slowJobs; default: // Interesting. abort() is tagged as "((__noreturn__))". So the // compiler is smart enough not to complain that I didn't return // a value from this function. abort(); } } ///////////////////////////////////////////////////////////////////// // User ///////////////////////////////////////////////////////////////////// class User { private: SocketInfo *const _socketInfo; // Mapping the request names to the requests. The name is required so // the client can send a cancel request. In other versions of the top // list and alerts we also send this back to the client with each response // so the client can map the response back to a request. That's no longer // necessary. Each request comes with its own message_id now. The client // should be able to match responses with requests based only on that id. // Unfortunately some parts of the client might have trouble finding the // message id. Otherwise we'd use the message id rather for canceling and // we wouldn't need the name at all. std::map< std::string, TopListRequest::Ref > _byName; // These are requests that are ready now and are just waiting for a CPU // to become available to execute them. PriorityQueue _databaseRequests; PriorityQueue _normalRequests; PriorityQueue _fastRequests; PriorityQueue &getQueue(Queue which); NewWorkerCluster &getCluster(Queue which); void checkQueue(PriorityQueue *source, NewWorkerCluster *destination); void checkQueue(Queue which); public: User(SocketInfo *socketInfo) : _socketInfo(socketInfo) { } SocketInfo *getSocketInfo() const { return _socketInfo; } bool isActive() const { return !_byName.empty(); } // It is safe to call this at any time, as long as you're in the // dispatcher thread. The request might have already been deleted. // The request might be in another thread. void stop(std::string const &name); void newRequest(TopListRequest::Ref &request); // Try to run this soon. This might have to wait if other requests are // also ready now. This is different from scheduleSoon(). scheduleSoon() // will wait for a timer or other event to say that we're ready to look // at the request. After that event, we automatically call scheduleNow() // on the request. void scheduleNow(TopListRequest::Ref const &request); }; PriorityQueue &User::getQueue(Queue which) { switch (which) { case Queue::Database: return _databaseRequests; case Queue::Normal: return _normalRequests; case Queue::Fast: return _fastRequests; default: abort(); } } NewWorkerCluster &User::getCluster(Queue which) { return Resources::getInstance().get(which); } void User::checkQueue(PriorityQueue *source, NewWorkerCluster *destination) { // Notice that we explicitly use pointers for the inputs to this function. // If a function takes inputs by reference, and a lambda tries to copy one // of those inputs, it doesn't always work right. Things are are lot // simpler if the function uses * rather than &. if (source->empty()) // No work to do. return; if (destination->pendingJobCount(_socketInfo)) // The worker cluster is already aware of us. return; destination->addJobPre< TopListRequest::Ref > ([=](TopListRequest::Ref &request) -> bool { while ((request = source->pop())) { if (!request->isDone()) { // We found a good one. Use it. request->beforeWorkerThread(); return true; } } // We couldn't find any good requests. Give up. return false; }, [=](TopListRequest::Ref &request) { request->inWorkerThread(); }, [=](TopListRequest::Ref &request) { request->afterWorkerThread(); if (request->isDone()) { //TclList msg; //msg<debugDump(); //LogFile::primary().sendString(msg, _socketInfo); stop(request->getName()); } else if (request->weveSentRowData() || (request->getQueue() == Queue::Normal)) { //TclList msg; //msg<debugDump(); //LogFile::primary().sendString(msg, _socketInfo); // After we send data we come back here. We call schedule soon // for the next time. Also, if we're in the "normal" we come // here immediately after initialization. scheduleSoon() will // look at _lastUpdateMicroTime to decide when we should run again. // That might provide a short delay or it might run us immediately. Resources::getInstance().scheduleSoon(request); } else { //TclList msg; //msg<debugDump(); //LogFile::primary().sendString(msg, _socketInfo); // We just initialized a "fast" request. It doesn't matter what the // client told us. We can't be sure if the client has seen the // current state of this data. And we can't be sure if the next // update will be any time soon. To be safe, always request this // data ASAP. scheduleNow(request); } checkQueue(source, destination); }, _socketInfo); } void User::checkQueue(Queue which) { checkQueue(&getQueue(which), &getCluster(which)); } void User::scheduleNow(TopListRequest::Ref const &request) { //TclList msg; //msg<debugDump(); //LogFile::primary().sendString(msg, _socketInfo); if (request->isDone()) // Possibly a request was canceled while it was waiting. There are // different places where it could be waiting, so we could check in // each of those places, or we could just check here. return; Queue queue = request->getQueue(); getQueue(queue).push(request); checkQueue(queue); } void User::newRequest(TopListRequest::Ref &request) { assert(!request->isDone()); stop(request->getName()); _byName[request->getName()] = request; scheduleNow(request); } void User::stop(std::string const &name) { auto it = _byName.find(name); if (it != _byName.end()) { it->second->cancel(); _byName.erase(it); } } ///////////////////////////////////////////////////////////////////// // TimerList ///////////////////////////////////////////////////////////////////// class TimerList { private: std::multimap< TimeVal::Microseconds, std::function< void() > > _entries; public: // Call the lambda at the at the given time. void add(TimeVal::Microseconds wakeUpTime, std::function< void() > const &action); // How long should we sleep. The result is a good input for // IBeforeSleepCallbacks::wakeAfterMs() int wakeAfterMS() { return wakeAfterMS(getMicroTime()); } int wakeAfterMS(int64_t nowInMicroseconds); // Execute all lambdas that are ready. If more than one is ready, start // from the one that's been ready the longest. void doAll() { return doAll(getMicroTime()); } void doAll(int64_t nowInMicroseconds); }; void TimerList::add(TimeVal::Microseconds wakeUpTime, std::function< void() > const &action) { _entries.emplace(wakeUpTime, action); } int TimerList::wakeAfterMS(int64_t nowInMicroseconds) { if (_entries.empty()) // No work. No need to wake up. Sleep as long as you can. return std::numeric_limits< int >::max(); const auto it = _entries.begin(); // How long to wait in microseconds int64_t result = it->first - nowInMicroseconds; // Round up to get milliseconds. If we rounded the other way we'd wake // up too soon. In that case we'd do a busy wait until the right time. result = (result + 999) / 1000; if (result < 0) // Don't try to sleep for a negative amount of time. I'm not sure // this is required. But it seems cleaner this way. result = 0; return result; } void TimerList::doAll(int64_t nowInMicroseconds) { while (true) { if (_entries.empty()) return; const auto it = _entries.begin(); if (it->first > nowInMicroseconds) // That's all for now. The rest are not ready yet. return; // The first one is good. Execute it and remove it from our list. it->second(); _entries.erase(it); } } ///////////////////////////////////////////////////////////////////// // Main ///////////////////////////////////////////////////////////////////// class Main : public Resources, private ThreadMonitor::Extra { private: enum { mtStart, mtStop, mtNewLiveRecord, mtNewDelayedRecord }; std::map< SocketInfo::SerialNumber , User * > _users; // Initialize _topListPollingMicroseconds variable. int _topListPollingMicroseconds; // The following will create User object if it doesn't already exist. User *getUser(SocketInfo *socket); // The following will return NULL if the User object doesn't exist. User *getUserIfExists(SocketInfo::SerialNumber socket); User *getUserIfExists(SocketInfo *socket) { return getUserIfExists(SocketInfo::getSerialNumber(socket)); } // This contains things that need to happen at a certain time. That // includes "normal" but not "fast" top list requests. This list also // includes tickling the dead man timer and other tasks. TimerList _timerList; typedef std::map< std::string, std::set< TopListRequest::Ref > > WaitingBySymbol; WaitingBySymbol _fastLiveWaiting; WaitingBySymbol _fastDelayedWaiting; void sendUpdates(WaitingBySymbol &source, Request *request); // Schedule a checkup for the user at some time in the near future. Our // main concern is the dead man timer. void monitor(User *user) { monitorUser(SocketInfo::getSerialNumber(user->getSocketInfo())); } void monitorUser(SocketInfo::SerialNumber socketSerialNumber); void schedulePurgeSoon(); void startPurgeNow(); virtual void socketClosed(SocketInfo *socket); virtual void handleRequestInThread(Request *original); virtual void initializeInThread(); virtual void beforeSleep(IBeforeSleepCallbacks &callbacks); virtual void awake(std::set< int > const &woken); virtual void scheduleSoon(TopListRequest::Ref request); // From ThreadMonitor::Extra virtual std::string getInfoForThreadMonitor(); public: Main(); }; static const std::string s_handleRequestInThread = "handleRequestInThread"; void Main::schedulePurgeSoon() { const time_t now = time(NULL); const time_t today = midnight(now); const time_t tomorrow = midnight(today + MARKET_HOURS_HOUR * 36); const time_t wakeTimeT = tomorrow + PURGE_TIME_SECONDS; const auto wakeMicroSeconds = wakeTimeT * 1000000; _timerList.add(wakeMicroSeconds, [=](){ startPurgeNow(); }); LogFile::primary().sendString(TclList()< records(NULL); NewWorkerCluster::JobInfo::Ref jobInfo; jobInfo.setInThread([=]() { DatabaseWithRetry &database = *DatabaseForThread(DatabaseWithRetry::SLAVE); DataProvider::InitialState::getDailyHistory(database, *records, timeTToMysql(time(NULL))); }); jobInfo.setBackHome([=]() { // Report the change at the ideal time. If it was off by a few seconds // from one day to the next (or, worse yet, one thread to the next) // that would cause unnecessary questions and confusion. The results // will be the same. This only refers to the time which is usually // displayed in the window header. const time_t time = midnight(::time(NULL)) + PURGE_TIME_SECONDS; fastJobs.addJobAll([=]() { DataProvider::overnightPurge(records, time); }); slowJobs.addJobAll([=]() { DataProvider::overnightPurge(records, time); }); schedulePurgeSoon(); }); databaseJobs.addJob(jobInfo); } std::string Main::getInfoForThreadMonitor() { TclList result; result<<"_users"<<_users.size() <<"_timerList"<<(ntoa(_timerList.wakeAfterMS()) + "ms") <<"_fastLiveWaiting"<<_fastLiveWaiting.size() <<"_fastDelayedWaiting"<<_fastDelayedWaiting.size() <isActive()) // The user has at least one outstanding request. Keep him alive. CommandDispatcher::getInstance() ->getDeadManTimer().touchConnection(user->getSocketInfo()); // Check again soon. monitorUser(socketSerialNumber); } }); } void Main::sendUpdates(WaitingBySymbol &source, Request *request) { NewRecord *message = dynamic_cast< NewRecord * >(request); Record::Ref const &record = message->record; bool valid; std::string symbol; record->lookUpValue(MainFields::symbol).getString(valid, symbol); if (!valid) ThreadMonitor::find().increment("CAN'T FIND SYMBOL"); else { WaitingBySymbol::iterator sourceIt = source.find(symbol); if (sourceIt == source.end()) // No listeners. return; std::set< TopListRequest::Ref > requests; sourceIt->second.swap(requests); source.erase(sourceIt); for (auto it = requests.begin(); it != requests.end(); it++) { TopListRequest::Ref const &request = *it; User *const user = getUserIfExists(request->getSocketSerialNumber()); if (!user) // The user has already hung up. _socket points to an object which // has probably been deleted. Don't touch the socket under any // circumstances. Just ignore this request and move on. continue; user->scheduleNow(request); } } } void Main::awake(std::set< int > const &woken) { // Note: This has to be done in awake(). If you try to do this in // beforeSleep() some messages might get lost. For example: // 1) The thread calls beforeSleep() for the worker clusters. // 2) The thread calls beforeSleep() for this class. // 3) beforeSleep() for this class calls scheduleNow() // 4) scheduleNow() adds something to a worker cluster. // 5) scheduleNow() does not request a wakeup because nothing else is // waiting. // 6) The thread sleeps for a long time waiting for an external event. // 7) Eventually the thread wakes up and eventually calls beforeSleep() // for the worker clusters. // 8) At that time the job we wanted to run right away is finally sent // to a worker thread. // // I saw this exact thing in test. The first output from my top list // window worked as planned. scheduleNow() was called 30 seconds later. // But nothing else happened. Eventually I sent another request, and // that caused the first request to be processed. // // Presumably if you had more activity things would never sit forever. But // some requests would sit longer than they should. That would be harder // to track down, but it would still not be correct. _timerList.doAll(); } void Main::beforeSleep(IBeforeSleepCallbacks &callbacks) { callbacks.wakeAfterMs(_timerList.wakeAfterMS()); } void Main::scheduleSoon(TopListRequest::Ref request) { switch (request->getQueue()) { case Queue::Database: // This should never happen. If you need the database you need it // immediately. We could probably put this with the normal records // without any issues. But since we're not expecting it, let's // call it an error. abort(); break; case Queue::Done: // Again, this should never happen. We could deal with it if we wanted // to, but this looks like an error. abort(); break; case Queue::Fast: { // Take another look at this the next time the stock updates. WaitingBySymbol &queue = request->isDelayed()?_fastDelayedWaiting:_fastLiveWaiting; queue[request->getSingleSymbol()].insert(request); break; } case Queue::Normal: { // Take another look at this in 30 seconds. // request->getLastUpdateMicroTime() will usually be pretty close to // the current time. We set the time when we finished running in the // other thread. Hopefully this thread woke up to finish the process // almost instantly after that. However, this becomes interesting // when the client gets disconnected. Then we have to rely on the client // to tell us when the last response was. It might have been 1/2 a // second ago, or 30 seconds ago. It be 0 to say we've never seen a // response yet, so please send the first response ASAP. _timerList.add(request->getLastUpdateMicroTime() + _topListPollingMicroseconds, [=](){ if (!request->isDone()) if (User *user = getUserIfExists(request->getSocketSerialNumber())) user->scheduleNow(request); }); break; } default: // This definitely shouldn't happen!! abort(); } } void Main::handleRequestInThread(Request *original) { ThreadMonitor::SetState tm(s_handleRequestInThread); switch (original->callbackId) { case mtStart: { ExternalRequest *current = dynamic_cast< ExternalRequest * >(original); SocketInfo *socket = current->getSocketInfo(); User *user = getUser(socket); TopListRequest::Ref request(NULL, current); user->newRequest(request); break; } case mtStop: { ExternalRequest *current = dynamic_cast< ExternalRequest * >(original); SocketInfo *socket = current->getSocketInfo(); if (User *user = getUserIfExists(socket)) user->stop(current->getProperty("name")); break; } case mtNewLiveRecord: sendUpdates(_fastLiveWaiting, original); break; case mtNewDelayedRecord: sendUpdates(_fastDelayedWaiting, original); break; } } void Main::socketClosed(SocketInfo *socket) { // See if we already have a User object for this socket. auto it = _users.find(SocketInfo::getSerialNumber(socket)); if (it != _users.end()) { // We have one. Now delete it. delete it->second; _users.erase(it); } } User *Main::getUser(SocketInfo *socket) { User *&result = _users[SocketInfo::getSerialNumber(socket)]; if (!result) { result = new User(socket); monitor(result); } return result; } User *Main::getUserIfExists(SocketInfo::SerialNumber socket) { return getPropertyDefault(_users, socket); } void Main::initializeInThread() { ThreadMonitor::find().add(this); // You should only access a worker cluster from that cluster's main thread. databaseJobs.createWorkers(getConfigItem("top_list_micro_service_db", 2)); struct InitialStates { DataProvider::InitialState::Ref live; DataProvider::InitialState::Ref delayed; }; const SmarterP< InitialStates > initialStates(NULL); NewWorkerCluster::JobInfo::Ref jobInfo; jobInfo.setInThread([=]() { Parse::CommonRules::instance(); // Force CommonRules to initialize in this thread. DatabaseWithRetry &database = *DatabaseForThread(DatabaseWithRetry::SLAVE); initialStates->live = DataProvider::InitialState::get(database, false); initialStates->delayed = DataProvider::InitialState::get(database, true); LogFile::primary().sendString(TclList()<delayed, true); DataProvider::init(initialStates->live, false); LogFile::primary().sendString(TclList()<listenForRecords(this, mtNewLiveRecord); CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand("ms_top_list_start", this, mtStart); cd->listenForCommand("ms_top_list_stop", this, mtStop); start(); } } void initTopListMicroService() { if (getConfigItem("top_list_micro_service") == "1") new TopListMicroService::Main; }