#include "../../shared/ThreadClass.h" #include "../../shared/ThreadMonitor.h" #include "../../shared/PollSet.h" #include "../../shared/SelectableRequestQueue.h" #include "../../shared/SimpleLogFile.h" #include "../../shared/ReplyToClient.h" #include "../../shared/CommandDispatcher.h" #include "../../shared/GlobalConfigFile.h" #include "../../shared/TwoDLookup.h" #include "../../shared/MarketHours.h" #include "../misc_framework/CsvFileDataNodes.h" #include "../../shared/TalkWithServer.h" #include "StockTwitsDataNode.h" class StockTwitsTimeDataNode : public DataNode { private: std::map< int, double > _tweetsByTime; StockTwitsTimeDataNode(DataNodeArgument const &args); friend class DataNode; public: // This returns a ratio. For example, at this time of day we normally // see only 1/10 of the total number of tweets that we expect to see // by the end of the day. The ratio is useful because you can multiply // that by the total number of tweets we see for a specific stock in a day // to know how many tweets to expect for that stock so far. double getExpectation() const; static DataNodeLink *find(StockTwitsTimeDataNode *&node) { return findHelper(NULL, 0, node, DataNodeArgument()); } }; struct StockTwitsDataExtractor { StockTwitsDataFields data; char symbol[0]; static StockTwitsDataExtractor *get(std::string &encoded); } __attribute__ ((packed)); class StockTwitsDataMessage : public BroadcastMessage { public: StockTwitsDataFields data; StockTwitsDataMessage(std::string const &symbol); }; class StockTwitsDataThread : private ThreadClass, TalkWithServer::IMessageListener { private: enum { mtAddSymbol, mtRemoveSymbol, mtDebugAdd, mtDebugRemove, mtDebugDump, mtQuit }; enum { ciPing, ciData }; // Client id for messages from server. SelectableRequestQueue _incoming; DataNodeManager *const _dataNodeManager; std::set< std::string > _activeSymbols; TalkWithServer *_talkWithServer; const std::string _serverName; const std::string _serverPort; void checkConnection(bool createIfRequired=true); void sendAddMessage(std::string const &symbol); void startDataImpl(std::string const &symbol); void stopDataImpl(std::string const &symbol); time_t _nextPingTime; bool _pingSent; void checkPing(); virtual void onMessage(std::string bytes, int clientId, TalkWithServer::CancelId cancelId); virtual void onAbort(int clientId, TalkWithServer::CancelId cancelId); class SymbolRequest : public Request { private: std::string _symbol; public: SymbolRequest(std::string const &symbol) : Request(NULL), _symbol(symbol) { } std::string const &getSymbol() const { return _symbol; } }; protected: virtual void threadFunction(); public: // For simplicty we only support one DataNodeThread. It would not be hard // to imagine a case where startData takes the DataNodeThread as an input, // and different DataNodeThreads could be listening. That's not an expected // use case, so I'm not too worried. If you call this in the data node // thread, you do not have to specify the data node manager. StockTwitsDataThread(DataNodeManager *dataNodeManager = NULL); ~StockTwitsDataThread(); // A duplicate start or stop request will be ignored. We only care about the // last one. Presumably these will only come from one thread, so we're not // worried about the messages getting out of order. (More precisely, that's // the caller's responsibility.) void startData(std::string const &symbol); void stopData(std::string const &symbol); static std::string getChannel(std::string const &symbol); static std::string const &getDebugChannel(); static std::string translateSymbol(std::string const &symbol); }; ///////////////////////////////////////////////////////////////////// // StockTwitsTimeDataNode // // This tells us how many tweets to expect from for this time of // day. This is not much of a data node. We mostly made this into // a data node for the sake of making it easy to find(). ///////////////////////////////////////////////////////////////////// StockTwitsTimeDataNode::StockTwitsTimeDataNode(DataNodeArgument const &args) { assert(!args); TwoDArray input; input.loadFromCSV(FileOwnerDataNode::findDataFile("stocktwits_smile.csv")); for (TwoDArray::StringList::const_iterator it = input.getRowHeaders().begin(); it != input.getRowHeaders().end(); it++) { const int time = strtolDefault(*it, -1); if (time < 0) continue; const double tweets = strtodDefault(input.get("tweets", *it), -1); if (tweets < 0) continue; _tweetsByTime[time] = tweets; } if (!_tweetsByTime.empty()) { const double max = _tweetsByTime.rbegin()->second; if (max <= 0) // If the greatest value is 0, stop here to avoid a /0 error. Clear // the table so no one will try to use it. _tweetsByTime.clear(); else { // The input is the total number of tweets expected so far. We // want a ratio, compared to the whole day. for (std::map< int, double >::iterator it = _tweetsByTime.begin(); it != _tweetsByTime.end(); it++) it->second /= max; } } } template< class Key, class Value > Value interpolate(std::map< Key, Value > const &map, Key key) { if (map.empty()) // We can't do anything meaningful. Presumably this will return a 0, // and someone will catch that to avoid a /0 error. return Value(); if (key <= map.begin()->first) // If the key is lower that the lowest key we have, use the lowest // value we have. Do not try to extrapolate. return map.begin()->second; if (key >= map.rbegin()->first) // Higher than the highest key. return map.rbegin()->second; const typename std::map< Key, Value >::const_iterator greaterOrEqual = // Smallest key that is greater than or equal to the desired key. map.lower_bound(key); assert(greaterOrEqual != map.end()); if (greaterOrEqual->first == key) // exact match return greaterOrEqual->second; typename std::map< Key, Value >::const_iterator less = greaterOrEqual; less--; assert(less != map.end()); const double proportion = (key - less->first) / (greaterOrEqual->first - less->first); return (greaterOrEqual->second * proportion) + ((1 - proportion) * less->second); } double StockTwitsTimeDataNode::getExpectation() const { return interpolate(_tweetsByTime, secondOfTheDay(getSubmitTime())); } ///////////////////////////////////////////////////////////////////// // StockTwitsDataExtractor ///////////////////////////////////////////////////////////////////// StockTwitsDataExtractor *StockTwitsDataExtractor::get(std::string &encoded) { if (encoded.size() < sizeof(StockTwitsDataExtractor)) // Invalid message! return NULL; // Note that this is where the trailing null is added to the symbol. // It does not come from the server. return (StockTwitsDataExtractor *)encoded.c_str(); } ///////////////////////////////////////////////////////////////////// // StockTwitsDataMessage ///////////////////////////////////////////////////////////////////// StockTwitsDataMessage::StockTwitsDataMessage(std::string const &symbol) : BroadcastMessage(StockTwitsDataThread::getChannel(symbol)) { } ///////////////////////////////////////////////////////////////////// // StockTwitsDataThread ///////////////////////////////////////////////////////////////////// void StockTwitsDataThread::onMessage(std::string bytes, int clientId, TalkWithServer::CancelId cancelId) { ThreadMonitor &tm = ThreadMonitor::find(); switch (clientId) { case ciPing: { // Ping received. Send another one in 5 seconds. //std::cout<<"ciPing"<symbol); //std::cout<<"data for "<symbol<<" "<getChannel()<data = data->data; outgoing->send(_dataNodeManager); } break; } default: { // This shouldn't happen! tm.increment("unknown client id"); break; } } } void StockTwitsDataThread::onAbort(int clientId, TalkWithServer::CancelId cancelId) { // For simplicity we just clear all messages when we disconnect a // TalkWithServer object. A more complicated client might need these // calls, so the interface is appropriate, but just not for us. } void StockTwitsDataThread::threadFunction() { sendToLogFile(TclList()<wantsRead()) pollSet.addForRead(_talkWithServer->getHandle()); if (_talkWithServer->wantsWrite()) pollSet.addForWrite(_talkWithServer->getHandle()); } pollSet.setTimeoutMs(1000); pollSet.poll(); tm.setState("Read from queue"); _incoming.resetWaitHandle(); while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtAddSymbol: { SymbolRequest *request = dynamic_cast< SymbolRequest * >(current); startDataImpl(request->getSymbol()); break; } case mtRemoveSymbol: { SymbolRequest *request = dynamic_cast< SymbolRequest * >(current); stopDataImpl(request->getSymbol()); break; } case mtDebugAdd: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); const std::string symbol = request->getProperty("symbol"); TclList msg; msg<getSocketInfo(), "OK", request->getResponseMessageId()); startDataImpl(symbol); break; } case mtDebugRemove: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); const std::string symbol = request->getProperty("symbol"); TclList msg; msg<getSocketInfo(), "OK", request->getResponseMessageId()); stopDataImpl(symbol); break; } case mtDebugDump: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); BroadcastMessage *message = new BroadcastMessage(getDebugChannel()); message->send(_dataNodeManager); addToOutputQueue(request->getSocketInfo(), ntoa(_activeSymbols.size()), request->getResponseMessageId()); break; } case mtQuit: { delete current; return; } } delete current; } tm.setState("ping"); checkPing(); if (_talkWithServer) { tm.setState("_talkWithServer"); _talkWithServer->wakeUp(); } if (_talkWithServer && _talkWithServer->pendingResponseCount()) { tm.setState("responses"); _talkWithServer->doResponses(); } } } std::string StockTwitsDataThread::translateSymbol(std::string const &symbol) { // The stocktwits proxy translates the symbols on its own. We send it // symbols which are in our preferred / external format, and we expect // repsonses in that same format. // // This funtion is a hold over from the NASDAQ VF proxy code. return symbol; } void StockTwitsDataThread::checkConnection(bool createIfRequired) { if (_talkWithServer && _talkWithServer->disconnected()) { _talkWithServer->cancelAll(); delete _talkWithServer; _talkWithServer = NULL; } if (!createIfRequired) // We've put this object into a simpler state. Either _talkWithServer is // null or it's ready to go. return; if (_talkWithServer) // _talkWithServer is ready to go. return; _talkWithServer = new TalkWithServer("StockTwitsDataNode"); _talkWithServer->connect(_serverName, _serverPort); if (_talkWithServer->disconnected()) // Unable to open the connection. return; TalkWithServer::Message message; message["command"] = "listen"; _talkWithServer->sendMessage(message, this, ciData, true); for (std::set< std::string >::const_iterator it = _activeSymbols.begin(); it != _activeSymbols.end(); it++) sendAddMessage(*it); // Request a new ping ASAP. _nextPingTime = 0; _pingSent = false; } void StockTwitsDataThread::sendAddMessage(std::string const &symbol) { TalkWithServer::Message message; message["command"] = "add"; message["symbol"] = symbol; _talkWithServer->sendMessage(message); } void StockTwitsDataThread::checkPing() { if (_activeSymbols.empty()) // Don't keep the connection alive unless we have to. return; checkConnection(); if (_nextPingTime < time(NULL)) { // Timer went off. if (_pingSent) { // Response is late. Disconnect and try again. _talkWithServer->disconnect(); ThreadMonitor::find().increment("ping not found"); } else { // Time to send another ping TalkWithServer::Message message; message["command"] = "ping"; message["response"] = "1"; _talkWithServer->sendMessage(message, this, ciPing, false); _nextPingTime = time(NULL) + 5; _pingSent = true; } } } StockTwitsDataThread::StockTwitsDataThread(DataNodeManager *dataNodeManager) : ThreadClass("StockTwitsDataThread"), _incoming(getName()), _dataNodeManager(dataNodeManager?dataNodeManager:DataNodeManager::getDefault()), _talkWithServer(NULL), _serverName(getConfigItem("st_host", "dice")), _serverPort(getConfigItem("st_port", "1985")) { startThread(); CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand("st_add", &_incoming, mtDebugAdd); cd->listenForCommand("st_remove", &_incoming, mtDebugRemove); cd->listenForCommand("st_dump", &_incoming, mtDebugDump); } StockTwitsDataThread::~StockTwitsDataThread() { // This isn't really implemented. There are issues about the debug command, // and making sure that we don't start a second thread while the first one // is still running. //Request *request = new Request(null); //request->callbackId = mtQuit; //_incoming.newRequest(request); waitForThread(); } void StockTwitsDataThread::startDataImpl(std::string const &symbol) { checkConnection(); if (_activeSymbols.insert(symbol).second) sendAddMessage(symbol); } void StockTwitsDataThread::stopDataImpl(std::string const &symbol) { if (_activeSymbols.erase(symbol)) { // This was in our list. checkConnection(false); if (_talkWithServer) { // We are connected. (The server automatically clears all of our // requests when we disconnect. So there is no point connecting just // to remove an item.) TalkWithServer::Message message; message["command"] = "remove"; message["symbol"] = symbol; _talkWithServer->sendMessage(message); } } } void StockTwitsDataThread::startData(std::string const &symbol) { Request *request = new SymbolRequest(symbol); request->callbackId = mtAddSymbol; _incoming.newRequest(request); } void StockTwitsDataThread::stopData(std::string const &symbol) { Request *request = new SymbolRequest(symbol); request->callbackId = mtRemoveSymbol; _incoming.newRequest(request); } std::string StockTwitsDataThread::getChannel(std::string const &symbol) { return "StockTwitsDataThread.data." + symbol; } std::string const &StockTwitsDataThread::getDebugChannel() { static const std::string S_DEBUG = "StockTwitsDataThread.debug.all"; return S_DEBUG; } ///////////////////////////////////////////////////////////////////// // StockTwitsDataNode ///////////////////////////////////////////////////////////////////// void StockTwitsDataNode::getRelativeVolume(bool &valid, double &value) const { valid = false; value = 0.0; if (!_valid) return; if (_dailyAverage <= 0) return; if (_data.messageCount <= 0) { // This is an optimization. If the numerator is 0, don't bother to // compute the denominator. Just return 0. In some cases this could // result in a different answer, but I don't expect that to happen in // real life. That would mean that that we didn't expect any activity // stock twits by the time we started generating alerts. valid = true; value = 0; return; } const double timeRatio = _stockTwitsTimeData->getExpectation(); if (timeRatio <= 0) return; // _dailyAverage * timeRatio == the expectation for this stock at this time. value = _data.messageCount / (_dailyAverage * timeRatio); valid = true; } void StockTwitsDataNode::onBroadcast(BroadcastMessage &message, int msgId) { switch (msgId) { case bmValue: { StockTwitsDataMessage &dataMessage = dynamic_cast< StockTwitsDataMessage & >(message); _data = dataMessage.data; _valid = true; notifyListeners(); break; } case bmDebug: { TclList msg; msg<startData(_symbol); } StockTwitsDataNode::~StockTwitsDataNode() { stockTwitsDataThread->stopData(_symbol); } void StockTwitsDataNode::init() { // We make it the caller's responsibility to call us in only one // thread. That seems reasonable. At the moment the StockTwitsDataThread // class must be unique, and it would take some effort to make this // function thread safe. if (!stockTwitsDataThread) stockTwitsDataThread = new StockTwitsDataThread(); // This is what comes over the wire, so it has to be exact. assert((sizeof(StockTwitsDataFields)==9) && (sizeof(StockTwitsDataExtractor)==9)); } ///////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////