#include #include "../alert_framework/AlertBase.h" #include "../data_framework/GenericTosData.h" #include "../misc_framework/Timers.h" #include "../../shared/MarketHours.h" #include "Heartbeat.h" enum { wTos, wDispatch }; //////////////////////////////////////////////////////////////////// // HeartbeatDispatch //////////////////////////////////////////////////////////////////// static const int BASE_QUALITY_SIZE = 128; class HeartbeatRandomCompare { private: union pieces { const DataNodeListener *p; unsigned char c[8]; }; const DataNodeListener *changed(const DataNodeListener *input) const { pieces inPieces; inPieces.p = input; pieces output; output.c[0] = inPieces.c[7]; output.c[1] = inPieces.c[6]; output.c[2] = inPieces.c[5]; output.c[3] = inPieces.c[4]; output.c[4] = inPieces.c[3]; output.c[5] = inPieces.c[2]; output.c[6] = inPieces.c[1]; output.c[7] = inPieces.c[0]; return output.p; } public: bool operator()(const DataNodeListener *x, const DataNodeListener *y) const { return changed(x) < changed(y); } }; class HeartbeatDispatch : public DataNode { private: typedef std::set< DataNodeListener *, HeartbeatRandomCompare > Listeners; Listeners _listeners; Listeners::iterator _nextListener; double _savedItemsToSend; drand48_data _randomState; // This says what fraction of the listeners the dispatcher should wake up // each time that the dispatcher is woken. This should be greater than 0.0 // and at most 1.0. double _sendEachTime; // This "Helper" will automatically cancel the timer in this objects // destructor. TimerThread::Helper _timerChannel; int32_t _baseQuality[BASE_QUALITY_SIZE]; void onBroadcast(BroadcastMessage &message, int msgId); HeartbeatDispatch(DataNodeArgument args); friend class DataNode; public: // We use this rather than the standard routines for listening because // this allows us to efficiently notify different listeners at different // times. Do not call these during the callback! void addListener(DataNodeListener *listener); void removeListener(DataNodeListener *listener); static DataNodeLink *find(HeartbeatDispatch *&node, int period); // This class takes care of initializing the random number generator. This // class also contains the state for the random number generator, so // different instances of this object could co-exist in different threads. double dRand(); int32_t baseQuality(int index) const; }; int32_t HeartbeatDispatch::baseQuality(int index) const { return _baseQuality[index % BASE_QUALITY_SIZE]; } HeartbeatDispatch::HeartbeatDispatch(DataNodeArgument args) : _nextListener(_listeners.end()), _savedItemsToSend(0.0), _timerChannel(getOwnerChannel(), &getManager()->getTimerThread()) { registerForBroadcast(_timerChannel, 0); srand48_r(TimeVal(true).asMicroseconds(), &_randomState); // This is how long the dispatcher sleeps before waking another group of // data nodes. const int wakeupTimeMS = 100; // This is the period in seconds for any one stock. const double period = args.getIntValue(); _sendEachTime = wakeupTimeMS / 1000.0 / period; getManager()->getTimerThread() .requestPeriodicBroadcast(_timerChannel, wakeupTimeMS); // This was aimed at the traditional heart beat alerts, where the period is // an integer number of minutes. Now we are also using these alerts for // the top list, with an expected period of 30 seconds. This codes doesn't // work well for that, but we don't use the quality for the top list, so it // doesn't matter. const int CYCLE_TIME_MINUTES = (int)(period / MARKET_HOURS_MINUTE + 0.5); for (int i=0; i < BASE_QUALITY_SIZE; i++) { _baseQuality[i] = CYCLE_TIME_MINUTES; int modified = i; while (modified & 1) { modified >>= 1; _baseQuality[i] <<= 1; } } } double HeartbeatDispatch::dRand() { double result; drand48_r(&_randomState, &result); return result; } void HeartbeatDispatch::onBroadcast(BroadcastMessage &message, int msgId) { if (_listeners.empty()) return; _savedItemsToSend += _listeners.size() * _sendEachTime; int sendThisTime = (int)(_savedItemsToSend + 0.5); _savedItemsToSend -= sendThisTime; if (sendThisTime < 0) // This could easily happen because of rounding. sendThisTime = 0; else if (sendThisTime > (int)_listeners.size()) // This could easily happen if the number of listeners dropped a lot // since the last time this function was called. sendThisTime = _listeners.size(); while (sendThisTime--) { if (_nextListener == _listeners.end()) _nextListener = _listeners.begin(); (*_nextListener)->onWakeup(wDispatch); _nextListener++; } } void HeartbeatDispatch::addListener(DataNodeListener *listener) { _listeners.insert(listener); } void HeartbeatDispatch::removeListener(DataNodeListener *listener) { if ((_nextListener != _listeners.end()) && (*_nextListener == listener)) // Even if some items are added or deleted, the order of the remaining // items is preserved. _nextListener++; _listeners.erase(listener); } DataNodeLink *HeartbeatDispatch::find(HeartbeatDispatch *&node, int period) { // We never accept a listener here. Use addListener() instead. And the // message id is hard coded. return findHelper(NULL, 0, node, period); } //////////////////////////////////////////////////////////////////// // Heartbeat // // Heartbeat alerts go off once every 5 minutes for most stocks. The idea is // to allow look for stocks based entirely on filters, without any interesting // event happening. The only requirement for a heartbeat alert is that the // stock has had at least one new print in the last hour. // // We try to spread out the alerts. We don't look at all the stocks at once // every 5 minutes. Instead we break the work up. Approximately once per // second we wake up and send a few alerts. We try to spread the different // symbols out evenly over the 5 minute period. After 5 minutes, we repeat // the cycle. // // The quality field for this alert is unique. The user can specify how often // he wants to see each symbol. The quality is measured in minutes. If the // user sets his minimum quality to 5 or less, that doesn't change anything. // The alerts never come faster than once every 5 minutes. // // If the user sets his minimum quality to 10, then he will see exactly half // of the alerts. He will also see exactly half of the alerts for any specific // symbol. If he sets the minimum quality to 20, 40, or 80 minutes he will see // 1 alert in 4, 8, or 16, respectively. _eventCount and baseQuality() take // care of this. nextEventCount is used to spread things out. Even if you // use a quality filter, the alerts should be spread out in time. When you set // the quality to 10, you don't want 5 minutes of full speed alerts followed // by 5 minutes of nothing. nextEventCount takes care of that. // // If the user selects a quality which is a power of 2 times 5 minutes, then // the logic above will be exact. However, the user might put in a number // like 30. In this case the user will see at least as many alerts as if his // quality was 20, and no more than if his quality was 40. The code which // generates the alert will use a random number to augment the logic from // baseQuality(). The random number is scaled in an appropriate way so that // if the user sets his minimum quality to X minutes, then each stock will // appear approximately once every X minutes. //////////////////////////////////////////////////////////////////// static int nextEventCount = 0; class Heartbeat : public Alert { private: GenericTosDataNode *_tosData; HeartbeatDispatch *_dispatcher; int _eventCount; time_t _lastPrintTime; void onWakeup(int msgId); Heartbeat(DataNodeArgument const &args); friend class GenericDataNodeFactory; ~Heartbeat(); }; static const std::string s_AllFiltersSatisfied = "All filters satisfied."; void Heartbeat::onWakeup(int msgId) { switch (msgId) { case wTos: if (_tosData->getValid() && _tosData->getLast().newPrint) _lastPrintTime = getSubmitTime(); break; case wDispatch: { static const int timeout = MARKET_HOURS_HOUR; if ((getSubmitTime() - _lastPrintTime) <= timeout) { const double betweenQuality = 2.0 / (2 - _dispatcher->dRand()); const double quality = _dispatcher->baseQuality(_eventCount++) * betweenQuality; report(s_AllFiltersSatisfied, quality); } break; } } } Heartbeat::Heartbeat(DataNodeArgument const &args) : _eventCount(nextEventCount++), _lastPrintTime(0) { DataNodeArgumentVector const &argList = args.getListValue(); assert(argList.size() == 2); // Symbol, Period in seconds std::string const &symbol = argList[0].getStringValue(); const int period = argList[1].getIntValue(); addAutoLink(GenericTosDataNode::find(this, wTos, _tosData, symbol)); addAutoLink(HeartbeatDispatch::find(_dispatcher, period)); _dispatcher->addListener(this); } Heartbeat::~Heartbeat() { _dispatcher->removeListener(this); } //////////////////////////////////////////////////////////////////// // Global //////////////////////////////////////////////////////////////////// void initializeHeartbeat() { assert(sizeof(DataNodeListener *)==8); // Our randomizing algorithm will fail badly if this is not true. GenericDataNodeFactory::sf< Heartbeat > ("Heartbeat", symbolPlaceholderObject, 5 * MARKET_HOURS_MINUTE); GenericDataNodeFactory::sf< Heartbeat > ("FastHeartbeat", symbolPlaceholderObject, 15); }