#include "../../shared/ThreadClass.h" #include "../../shared/ThreadMonitor.h" #include "../../shared/PollSet.h" #include "../../shared/SelectableRequestQueue.h" #include "../../shared/SimpleLogFile.h" #include "../../shared/ReplyToClient.h" #include "../../shared/CommandDispatcher.h" #include "../../shared/GlobalConfigFile.h" #include "TalkWithServer.h" #include "VF.h" struct VFDataExtractor { VFDataFields data; char symbol[0]; static VFDataExtractor *get(std::string &encoded); }; class VFDataMessage : public BroadcastMessage { public: VFDataFields data; VFDataMessage(std::string const &symbol); }; class VFDataThread : private ThreadClass, TalkWithServer::IMessageListener { private: enum { mtAddSymbol, mtRemoveSymbol, mtDebugAdd, mtDebugRemove, mtDebugDump, mtQuit }; enum { ciPing, ciData }; // Client id for messages from server. SelectableRequestQueue _incoming; DataNodeManager *const _dataNodeManager; std::set< std::string > _activeSymbols; TalkWithServer *_talkWithServer; const std::string _serverName; const std::string _serverPort; void checkConnection(bool createIfRequired=true); void sendAddMessage(std::string const &symbol); void startDataImpl(std::string const &symbol); void stopDataImpl(std::string const &symbol); time_t _nextPingTime; bool _pingSent; void checkPing(); virtual void onMessage(std::string bytes, int clientId); virtual void onAbort(int clientId); class SymbolRequest : public Request { private: std::string _symbol; public: SymbolRequest(std::string const &symbol) : Request(NULL), _symbol(symbol) { } std::string const &getSymbol() const { return _symbol; } }; protected: virtual void threadFunction(); public: // For simplicty we only support one DataNodeThread. It would not be hard // to imagine a case where startData takes the DataNodeThread as an input, // and different DataNodeThreads could be listening. That's not an expected // use case, so I'm not too worried. If you call this in the data node // thread, you do not have to specify the data node manager. VFDataThread(DataNodeManager *dataNodeManager = NULL); ~VFDataThread(); // A duplicate start or stop request will be ignored. We only care about the // last one. Presumably these will only come from one thread, so we're not // worried about the messages getting out of order. (More precisely, that's // the caller's responsibility.) void startData(std::string const &symbol); void stopData(std::string const &symbol); static std::string getChannel(std::string const &symbol); static std::string const &getDebugChannel(); static std::string translateSymbol(std::string const &symbol); }; ///////////////////////////////////////////////////////////////////// // VFDataExtractor ///////////////////////////////////////////////////////////////////// VFDataExtractor *VFDataExtractor::get(std::string &encoded) { if (encoded.size() < sizeof(VFDataExtractor)) // Invalid message! return NULL; // Note that this is where the trailing null is added to the symbol. // It does not come from the server. return (VFDataExtractor *)encoded.c_str(); } ///////////////////////////////////////////////////////////////////// // VFDataMessage ///////////////////////////////////////////////////////////////////// VFDataMessage::VFDataMessage(std::string const &symbol) : BroadcastMessage(VFDataThread::getChannel(symbol)) { } ///////////////////////////////////////////////////////////////////// // VFDataThread ///////////////////////////////////////////////////////////////////// void VFDataThread::onMessage(std::string bytes, int clientId) { ThreadMonitor &tm = ThreadMonitor::find(); switch (clientId) { case ciPing: { // Ping received. Send another one in 5 seconds. //std::cout<<"ciPing"<symbol); //std::cout<<"data for "<symbol<<" "<getChannel()<data = data->data; outgoing->send(_dataNodeManager); } break; } default: { // This shouldn't happen! tm.increment("unknown client id"); break; } } } void VFDataThread::onAbort(int clientId) { // For simplicity we just clear all messages when we disconnect a // TalkWithServer object. A more complicated client might need these // calls, so the interface is appropriate, but just not for us. } void VFDataThread::threadFunction() { sendToLogFile(TclList()<wantsRead()) pollSet.addForRead(_talkWithServer->getHandle()); if (_talkWithServer->wantsWrite()) pollSet.addForWrite(_talkWithServer->getHandle()); } pollSet.setTimeoutMs(1000); pollSet.poll(); tm.setState("Read from queue"); _incoming.resetWaitHandle(); while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtAddSymbol: { SymbolRequest *request = dynamic_cast< SymbolRequest * >(current); startDataImpl(request->getSymbol()); break; } case mtRemoveSymbol: { SymbolRequest *request = dynamic_cast< SymbolRequest * >(current); stopDataImpl(request->getSymbol()); break; } case mtDebugAdd: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); const std::string symbol = request->getProperty("symbol"); TclList msg; msg<getSocketInfo(), "OK", request->getResponseMessageId()); startDataImpl(symbol); break; } case mtDebugRemove: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); const std::string symbol = request->getProperty("symbol"); TclList msg; msg<getSocketInfo(), "OK", request->getResponseMessageId()); stopDataImpl(symbol); break; } case mtDebugDump: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); BroadcastMessage *message = new BroadcastMessage(getDebugChannel()); message->send(_dataNodeManager); addToOutputQueue(request->getSocketInfo(), ntoa(_activeSymbols.size()), request->getResponseMessageId()); break; } case mtQuit: { delete current; return; } } delete current; } tm.setState("ping"); checkPing(); if (_talkWithServer) { tm.setState("_talkWithServer"); _talkWithServer->wakeUp(); } if (_talkWithServer && _talkWithServer->pendingResponseCount()) { tm.setState("responses"); _talkWithServer->doResponses(); } } } std::string VFDataThread::translateSymbol(std::string const &symbol) { // We do see some strange symbols, presumably NYSE preferred symbols // and the like, on the VF data feed. It would be nice to translate // them. if (symbol.size() > 4) { const int start = symbol.size() - 4; if ((symbol[start] == '.') && (symbol[start+1] == 'C') && (symbol[start+2] == 'A') && ((symbol[start+3] == 'T') || (symbol[start+3] == 'V'))) // This is a Canadian symbol. We know that the datafeed won't // cover these symbols. So we map all requests to one single // data node. return "-"; } return symbol; } void VFDataThread::checkConnection(bool createIfRequired) { if (_talkWithServer && _talkWithServer->disconnected()) { _talkWithServer->cancelAll(); delete _talkWithServer; _talkWithServer = NULL; } if (!createIfRequired) // We've put this object into a simpler state. Either _talkWithServer is // null or it's ready to go. return; if (_talkWithServer) // _talkWithServer is ready to go. return; _talkWithServer = new TalkWithServer(); _talkWithServer->connect(_serverName, _serverPort); if (_talkWithServer->disconnected()) // Unable to open the connection. return; TalkWithServer::Message message; message["command"] = "listen"; _talkWithServer->sendMessage(message, this, ciData, true); for (std::set< std::string >::const_iterator it = _activeSymbols.begin(); it != _activeSymbols.end(); it++) sendAddMessage(*it); // Request a new ping ASAP. _nextPingTime = 0; _pingSent = false; } void VFDataThread::sendAddMessage(std::string const &symbol) { TalkWithServer::Message message; message["command"] = "add"; message["symbol"] = symbol; _talkWithServer->sendMessage(message); } void VFDataThread::checkPing() { if (_activeSymbols.empty()) // Don't keep the connection alive unless we have to. return; checkConnection(); if (_nextPingTime < time(NULL)) { // Timer went off. if (_pingSent) { // Response is late. Disconnect and try again. _talkWithServer->disconnect(); ThreadMonitor::find().increment("ping not found"); } else { // Time to send another ping TalkWithServer::Message message; message["command"] = "ping"; message["response"] = "1"; _talkWithServer->sendMessage(message, this, ciPing, true); _nextPingTime = time(NULL) + 5; _pingSent = true; } } } VFDataThread::VFDataThread(DataNodeManager *dataNodeManager) : ThreadClass("VF.C"), _incoming(getName()), _dataNodeManager(dataNodeManager?dataNodeManager:DataNodeManager::getDefault()), _talkWithServer(NULL), _serverName(getConfigItem("vf_host", "becca")), _serverPort(getConfigItem("vf_port", "1984")) { startThread(); CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand("vf_add", &_incoming, mtDebugAdd); cd->listenForCommand("vf_remove", &_incoming, mtDebugRemove); cd->listenForCommand("vf_dump", &_incoming, mtDebugDump); } VFDataThread::~VFDataThread() { // This isn't really implemented. There are issues about the debug command, // and making sure that we don't start a second thread while the first one // is still running. //Request *request = new Request(null); //request->callbackId = mtQuit; //_incoming.newRequest(request); waitForThread(); } void VFDataThread::startDataImpl(std::string const &symbol) { checkConnection(); if (_activeSymbols.insert(symbol).second) sendAddMessage(symbol); } void VFDataThread::stopDataImpl(std::string const &symbol) { if (_activeSymbols.erase(symbol)) { // This was in our list. checkConnection(false); if (_talkWithServer) { // We are connected. (The server automatically clears all of our // requests when we disconnect. So there is no point connecting just // to remove an item.) TalkWithServer::Message message; message["command"] = "remove"; message["symbol"] = symbol; _talkWithServer->sendMessage(message); } } } void VFDataThread::startData(std::string const &symbol) { Request *request = new SymbolRequest(symbol); request->callbackId = mtAddSymbol; _incoming.newRequest(request); } void VFDataThread::stopData(std::string const &symbol) { Request *request = new SymbolRequest(symbol); request->callbackId = mtRemoveSymbol; _incoming.newRequest(request); } std::string VFDataThread::getChannel(std::string const &symbol) { return "VFDataThread.data." + symbol; } std::string const &VFDataThread::getDebugChannel() { static const std::string S_DEBUG = "VFDataThread.debug.all"; return S_DEBUG; } ///////////////////////////////////////////////////////////////////// // VFDataNode ///////////////////////////////////////////////////////////////////// void VFDataNode::onBroadcast(BroadcastMessage &message, int msgId) { switch (msgId) { case bmValue: { VFDataMessage &dataMessage = dynamic_cast< VFDataMessage & >(message); _data = dataMessage.data; _valid = true; notifyListeners(); break; } case bmDebug: { TclList msg; msg<startData(_symbol); } VFDataNode::~VFDataNode() { vfDataThread->stopData(_symbol); } void VFDataNode::init() { // We make it the caller's responsibility to call us in only one // thread. That seems reasonable. At the moment the VFDataThread // class must be unique, and it would take some effort to make this // function thread safe. if (!vfDataThread) vfDataThread = new VFDataThread(); } ///////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////