#include "../shared/ThreadMonitor.h" #include "../shared/Messages.h" #include "../shared/CommandDispatcher.h" #include "../shared/SimpleLogFile.h" #include "../shared/ReplyToClient.h" #include "../shared/GlobalConfigFile.h" #include "data_framework/GenericL1Data.h" #include "data_framework/GenericTosData.h" #include "data_framework/GenericHaltData.h" #include "misc_framework/GenericDataNodes.h" #include "data_framework/SynchronizedTimers.h" #include "WireFormats.h" #include "ProxyData.h" // TODO open interest // If you cancel a subscription we never send that message to the market // data proxy. Results are correct. Possibly inefficient, but not an issue // the way we normally use things. // If you ask for tos or l1, we request both from the market data proxy. // That could cause a problem. If we request one and much later request // the other, we'll never see the initial/cached values for the second // request. If we make a request and we have a slow moving stock, we might // sit for a very long time with no data. Normally we request both types // of data at the same time, if we are planning to use them both, so it's not // usually an issue. The ProxySubscriptionDataNode consolidates all requests // made in the same event. // We have a special request type called all tos. You can request this if you // want tos data for all symbols. we use a command line switch to decide if // we want to use this. // // Currently we ignore the all all tos flag for L1 data. We shouldn't. // When I first started this change I didn't realize that the L1 and TOS data // were both delivered from the proxy in the same request. This flag was mostly // aimed at TIQ which doesn't use L1 data so it's not a huge issue. ///////////////////////////////////////////////////////////////////// // ProxySubscriptionDataNode // // This the base object used to send subscription messages to and // from the data providers. This is slightly tricky because (a) // there can be multiple data providers and (b) the data providers // contact us, not the other way around. // // I made this a data node only to help find the object. It does // not use the standard node semantics. Currently this only works // in one thread. This also helps us because it could automatically do // the one time initialization the first time that someone requests // data. ///////////////////////////////////////////////////////////////////// class ProxySubscriptionDataNode : public DataNode, private RequestListener { private: static const int MAX_REQUESTS_PER_MSG = 4000; SocketClosedDataNode *_socketClosedDataNode; typedef std::set< std::string > SymbolSet; typedef std::map< std::string, SymbolSet > Subscriptions; Subscriptions _subscriptions; // New connections get all of these. Subscriptions _new; // Everyone gets these. static bool _allTos; // Special subscription. Request TOS for all symbols. typedef std::map< SocketInfo *, ExternalRequest::MessageId > DataProviders; DataProviders _dataProviders; enum { mtSubscription }; virtual void newRequest(Request *request); class NewProvider : public BroadcastMessage { public: NewProvider(std::string const &channel, SocketInfo *socket) : BroadcastMessage(channel, socket) { } ExternalRequest::MessageId responseMessageId; }; enum { bmNewProvider, bmSendSubscriptions }; std::string getNewProviderChannel() const; std::string getSendSubscriptionsChannel() const; virtual void onBroadcast(BroadcastMessage &message, int msgId); std::vector< std::string > getMessages(Subscriptions const &subscriptions); void updateNewProvider(NewProvider &newProvider); void sendToAllDataProviders(std::string const &msg); void requestDataNow(); // See findAndRequestAllTos(). That's probably what you want. void requestAllTosSoon(); // Schedule a call to requestDataNow(), unless one is already pending. bool _requestDataSoonPending; void requestDataSoon(); virtual void onWakeup(int msgId); ProxySubscriptionDataNode(DataNodeArgument const &args); friend class DataNode; public: // It is safe and acceptable to request the same data twice. Sometimes two // data nodes which are unrealted in this program will require the same // subscription from the proxy server. Also, we don't keep track of when // you stop using data. This data node will try to coalesse completely // identical requests for performance reasons. If two idencial requests // come in while executing the same event handle, we only send the first. void requestDataSoon(std::string const &type, std::string const &symbol); static DataNodeLink *find(ProxySubscriptionDataNode *&node) { return findHelper(NULL, 0, node, DataNodeArgument()); } // Because of the non standard usage, there's no reason to hold onto the data // node. This function will find the data node, call requstDataSoon(), then // release the data node. static void findAndRequest(std::string const &type, std::string const &symbol); // This works like findAndRequest() but it sends the special request for // TOS data for all symbols. It is efficient to make redundant calls to this // function. // // This special request was created especially for TIQ. TIQ will typically // request data for most of the market, pretty much all stocks that trade // actively. Requesting the symbols one at a time is hard on the server, // and may slow down other clients listening for market data. This special // request says that we don't care about the current state of the stocks, // only the streaming data going forward. That is perfect for TIQ which // is focused on candles and nothing else. // // Note: The main program is still expected to subscribe to specific // stocks. If you want data for a stock you need to find() a // GenericTosDataNode object for that stock. If we receive a message from // the server describing a stock, and we don't have a corresponding object // listening for it, the message will be thrown on the floor. static void findAndRequestAllTos(); // Make sure we are listening to external events. This will be done // automatically in findAndRequest(). But we might want to do it sooner. // Make sure we are listening for events before the program starts listening // for network connections. static void init(); // Known types; static const std::string L1; static const std::string REGIONAL; }; // Make this static as an optimization. We only have one // ProxySubscriptionDataNode, so this doesn't change any functionality. bool ProxySubscriptionDataNode::_allTos = false; // This should be called before the command handler starts listening to // new connections. That's not a problem the way we typically do things // because part of our early initialization of the data node thread will // make a data request to test things. // // Of course, the real solution would be for us to initiate the connection // to the data provider, rather than the other way around! ProxySubscriptionDataNode::ProxySubscriptionDataNode(DataNodeArgument const &args) : _requestDataSoonPending(false) { assert(!args); // This data node should never be freed because there is no way to stop // listening for commands! // Delete the DataNodeLink object to keep valgrind happy. delete createLink(); // Listen for a data provider trying to start a new connection. CommandDispatcher::getInstance() ->listenForCommand("subscription", this, mtSubscription); registerForBroadcast(getNewProviderChannel(), bmNewProvider); // When we get new requests, group them all together before sending them to // the data providers. At least wait until the end of the event handler. // We expect a lot of duplicates. And it's more effecient to send them in // groups. Send a message to ourselves to finish the request soon. registerForBroadcast(getSendSubscriptionsChannel(), bmSendSubscriptions); // Listen for a data provider hanging up. addAutoLink(SocketClosedDataNode::find(this, 0, _socketClosedDataNode)); } void ProxySubscriptionDataNode::findAndRequest(std::string const &type, std::string const &symbol) { ThreadMonitor &tm = ThreadMonitor::find(); static const std::string fname = "findAndRequest"; const std::string oldState = tm.getState(); tm.setState(fname); tm.increment(fname); ProxySubscriptionDataNode *node; DataNodeLink *link = find(node); node->requestDataSoon(type, symbol); link->release(); tm.setState(oldState); } void ProxySubscriptionDataNode::requestAllTosSoon() { assert(!_allTos); _allTos = true; requestDataSoon(); } void ProxySubscriptionDataNode::findAndRequestAllTos() { if (_allTos) // This is a duplicate request. Ignore it. return; ProxySubscriptionDataNode *node; DataNodeLink *link = find(node); node->requestAllTosSoon(); link->release(); } void ProxySubscriptionDataNode::init() { ThreadMonitor &tm = ThreadMonitor::find(); static const std::string fname = "ProxySubscriptionDataNode::init"; const std::string oldState = tm.getState(); tm.setState(fname); tm.increment(fname); ProxySubscriptionDataNode *node; DataNodeLink *link = find(node); // Creating the node the first time causes it to do the initialization. link->release(); tm.setState(oldState); } std::string ProxySubscriptionDataNode::getNewProviderChannel() const { return getOwnerChannel() + ".1"; } std::string ProxySubscriptionDataNode::getSendSubscriptionsChannel() const { return getOwnerChannel() + ".2"; } void ProxySubscriptionDataNode::newRequest(Request *request) { switch (request->callbackId) { case mtSubscription: { // Listen to a request from the command handler, and push it into // the right thread. Note that, unlike most broadcast messages, // the the socket field is not NULL. ExternalRequest ¤t = dynamic_cast< ExternalRequest & >(*request); NewProvider *newProvider = new NewProvider(getNewProviderChannel(), current.getSocketInfo()); newProvider->responseMessageId = current.getResponseMessageId(); newProvider->send(getManager()); break; } } delete request; } void ProxySubscriptionDataNode::onBroadcast(BroadcastMessage &message, int msgId) { //TclList msg; //msg<(message)); break; case bmSendSubscriptions: //msg<<"bmSendSubscriptions"; _requestDataSoonPending = false; requestDataNow(); break; } //sendToLogFile(msg); } std::vector< std::string > ProxySubscriptionDataNode::getMessages(Subscriptions const &subscriptions) { std::vector< std::string > result; std::string msg; int count = 0; if (_allTos) { count++; msg += "Add-All-TOS\r\n"; } for (auto typeIt = subscriptions.cbegin(); typeIt != subscriptions.end(); typeIt++) { std::string const &type = typeIt->first; const std::string prefix = "Add-" + type + ':'; SymbolSet const &symbolSet = typeIt->second; for (auto symbolIt = symbolSet.cbegin(); symbolIt != symbolSet.end(); symbolIt++) { std::string const &symbol = *symbolIt; msg += prefix; msg += symbol; msg += "\r\n"; count++; if (count >= MAX_REQUESTS_PER_MSG) { result.push_back(msg); msg.clear(); count = 0; } } } if (!msg.empty()) result.push_back(msg); // According to the internet there is nothing special that I have to do here. // The compiler will automatically notice that the "result" variable is // about to disappear so the compiler will automatically use move semantics. // http://stackoverflow.com/questions/4986673/c11-rvalues-and-move-semantics-confusion-return-statement return result; } void ProxySubscriptionDataNode::updateNewProvider(NewProvider &newProvider) { // This is the introduction from a data provider. We need to store this // for later, so we can send requests to the provider. //TclList msg; //msg<mySerialNumber() // <<"responseMessageId"< messages = getMessages(_subscriptions); //msg<<"outgoing message count"< messages = getMessages(_new); for (auto it = messages.cbegin(); it != messages.end(); it++) sendToAllDataProviders(*it); // Save these in case someone else asks later, so they know what they missed. // _subscriptions has the entire list for new connections. Add these new // requests from _new to the complete list in _subscriptions. for (auto kvp : _new) { std::string messageType = kvp.first; SymbolSet const &source = kvp.second; SymbolSet &destination = _subscriptions[messageType]; for (std::string symbol : source) destination.insert(symbol); } // These have already been handled completely. People might make additional // requests that will be added to _new. But we don't want to repeat any // work. _new.clear(); } void ProxySubscriptionDataNode::onWakeup(int msgId) { // A data provider hung up on us! _dataProviders.erase(_socketClosedDataNode->getSocket()); } void ProxySubscriptionDataNode::requestDataSoon() { //TclList msg; //msg<send(getManager()); // But just one. _requestDataSoonPending = true; } //msg<<"_requestDataSoonPending is now"<<_requestDataSoonPending; //sendToLogFile(msg); } void ProxySubscriptionDataNode::requestDataSoon(std::string const &type, std::string const &symbol) { _new[type].insert(symbol); requestDataSoon(); } void ProxySubscriptionDataNode::sendToAllDataProviders(std::string const &msg) { //TclList debug; //debug<<__FILE__<<__LINE__<<__FUNCTION__ // <first, msg, it->second); } const std::string ProxySubscriptionDataNode::L1 = "L1"; const std::string ProxySubscriptionDataNode::REGIONAL = "Regional"; ///////////////////////////////////////////////////////////////////// // ProxyListenerL1DataNode ///////////////////////////////////////////////////////////////////// class ProxyListenerL1DataNode : public GenericL1DataNode { private: static const int bmData = bmAvailable; static const int bmInvalid = bmData + 1; static std::string getDataChannel(std::string const &symbol) { return "ProxyListenerL1DataNode.D." + symbol; } static std::string getInvalidChannel(std::string const &symbol) { return "ProxyListenerL1DataNode.I." + symbol; } class Data : public BroadcastMessage { private: time_t _currentTime; L1Data _data; public: time_t getCurrentTime() const { return _currentTime; } L1Data const &getData() const { return _data; } Data(std::string const &symbol, WireL1 const &wire); }; ProxyListenerL1DataNode(DataNodeArgument const &args); friend class DataNode; protected: void onBroadcast(BroadcastMessage &message, int msgId); public: static DataNodeLink *find(DataNodeListener *listener, int msgId, GenericL1DataNode *&node, std::string const &symbol); static void sendData(DataNodeManager *manager, std::string const &symbol, WireL1 const &wire); static void sendInvalid(DataNodeManager *manager, std::string const &symbol); }; ProxyListenerL1DataNode::Data::Data(std::string const &symbol, WireL1 const &wire) : BroadcastMessage(getDataChannel(symbol), 0) { _currentTime = wire.currentTime; _data.bidPrice = wire.bidPrice; _data.bidSize = wire.bidSize; _data.bidExchange = &wire.bidExchange[0]; _data.askPrice = wire.askPrice; _data.askSize = wire.askSize; _data.askExchange = &wire.askExchange[0]; } void ProxyListenerL1DataNode::sendData(DataNodeManager *manager, std::string const &symbol, WireL1 const &wire) { Data *message = new Data(symbol, wire); //TclList msg; //msg<<__FILE__<<__LINE__<<__FUNCTION__ // <<"ProxyListenerL1DataNode"<getData().dump(); //sendToLogFile(msg); message->send(manager); } void ProxyListenerL1DataNode::sendInvalid(DataNodeManager *manager, std::string const &symbol) { BroadcastMessage *message = new BroadcastMessage(getInvalidChannel(symbol)); message->send(manager); } DataNodeLink *ProxyListenerL1DataNode::find(DataNodeListener *listener, int msgId, GenericL1DataNode *&node, std::string const &symbol) { ProxyListenerL1DataNode *tempNode; DataNodeLink *result = findHelper(listener, msgId, tempNode, symbol); node = tempNode; return result; } ProxyListenerL1DataNode::ProxyListenerL1DataNode(DataNodeArgument const &args) : GenericL1DataNode(args) { registerForBroadcast(getDataChannel(symbol()), bmData); registerForBroadcast(getInvalidChannel(symbol()), bmInvalid); ProxySubscriptionDataNode::findAndRequest(ProxySubscriptionDataNode::L1, symbol()); TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__<<"Listening for"<(message); _valid = true; _current = data.getData(); if (data.getCurrentTime()) SynchronizedTimer::safeSetTime(data.getCurrentTime()); notifyListeners(); break; } case bmInvalid: if (_valid) { _valid = false; notifyListeners(); } break; default: GenericL1DataNode::onBroadcast(message, msgId); } } ///////////////////////////////////////////////////////////////////// // ProxyListenerTosDataNode ///////////////////////////////////////////////////////////////////// class ProxyListenerTosDataNode : public GenericTosDataNode { private: static const int bmData = bmAvailable; static const int bmInvalid = bmData + 1; enum class VerboseDebug { No, Yes, Unknown }; static VerboseDebug _verboseDebug; static std::string getDataChannel(std::string const &symbol) { return "ProxyListenerTosDataNode.D." + symbol; } static std::string getInvalidChannel(std::string const &symbol) { return "ProxyListenerTosDataNode.I." + symbol; } class Data : public BroadcastMessage { private: time_t _currentTime; TosData _data; public: time_t getCurrentTime() const { return _currentTime; } TosData const &getData() const { return _data; } Data(std::string const &symbol, WireTos const &wire); }; // False is the traditional mode, send one request to the market data proxy // for each symbol we care about. True is the new mode, send one request // and the market data proxy will send TOS for all symbols. bool requestAllTos(); ProxyListenerTosDataNode(DataNodeArgument const &args); friend class DataNode; protected: void onBroadcast(BroadcastMessage &message, int msgId); public: static DataNodeLink *find(DataNodeListener *listener, int msgId, GenericTosDataNode *&node, std::string const &symbol); static void sendData(DataNodeManager *manager, std::string const &symbol, WireTos const &wire); static void sendInvalid(DataNodeManager *manager, std::string const &symbol); }; ProxyListenerTosDataNode::VerboseDebug ProxyListenerTosDataNode::_verboseDebug = VerboseDebug::Unknown; bool ProxyListenerTosDataNode::requestAllTos() { static bool result = getConfigItem("request_all_tos") == "1"; return result; } ProxyListenerTosDataNode::Data::Data(std::string const &symbol, WireTos const &wire) : BroadcastMessage(getDataChannel(symbol), 0) { _currentTime = wire.currentTime; _data.price = wire.price; _data.time = wire.time; _data.size = wire.size; _data.exchange = &wire.exchange[0]; _data.todaysClose = wire.todaysClose; _data.volume = wire.volume; _data.open = wire.open; _data.high = wire.high; _data.low = wire.low; _data.formT = wire.formT; _data.updatesLast = wire.updatesLast; _data.newPrint = wire.newPrint; } void ProxyListenerTosDataNode::sendData(DataNodeManager *manager, std::string const &symbol, WireTos const &wire) { if (_verboseDebug == VerboseDebug::Yes) ThreadMonitor::find().increment("TOS_sendData " + symbol); Data *message = new Data(symbol, wire); //TclList msg; //msg<<__FILE__<<__LINE__<<__FUNCTION__ // <<"ProxyListenerTosDataNode"<getData().dump(); //sendToLogFile(msg); message->send(manager); } void ProxyListenerTosDataNode::sendInvalid(DataNodeManager *manager, std::string const &symbol) { BroadcastMessage *message = new BroadcastMessage(getInvalidChannel(symbol)); message->send(manager); } DataNodeLink *ProxyListenerTosDataNode::find(DataNodeListener *listener, int msgId, GenericTosDataNode *&node, std::string const &symbol) { ProxyListenerTosDataNode *tempNode; DataNodeLink *result = findHelper(listener, msgId, tempNode, symbol); node = tempNode; return result; } ProxyListenerTosDataNode::ProxyListenerTosDataNode(DataNodeArgument const &args) : GenericTosDataNode(args) { if (_verboseDebug == VerboseDebug::Unknown) { if (getConfigItem("proxy_data_verbose_debug") == "1") { _verboseDebug = VerboseDebug::Yes; sendToLogFile(TclList()< toReport; // Invariant: If there is at least one item in toReport then we have // an outstanding call to doReport() waiting for an idle moment. If // there are NO items in toReport, then we do NOT have a call waiting. const auto doReport = []() { TclList msg; msg<addToQueue(doReport); break; case 1000: // There are already a lot of items waiting to be sent to the log. We // were about to add one more, but we don't want the list to get too // long. Immediately display the items that were already waiting, but // but queue up the new item for next time. That way the invariant will // still be true. doReport(); break; } toReport.push_back(symbol()); } void ProxyListenerTosDataNode::onBroadcast(BroadcastMessage &message, int msgId) { switch (msgId) { case bmData: { Data const &data = dynamic_cast< Data const & >(message); _valid = true; _last = data.getData(); if (data.getCurrentTime()) SynchronizedTimer::safeSetTime(data.getCurrentTime()); notifyListeners(); break; } case bmInvalid: if (_valid) { _valid = false; notifyListeners(); } break; default: GenericTosDataNode::onBroadcast(message, msgId); } } ///////////////////////////////////////////////////////////////////// // ProxyListenerGenericIntDataNode ///////////////////////////////////////////////////////////////////// class ProxyListenerGenericIntDataNode : public GenericDataNode { private: static std::string getDataChannel(std::string const &symbol, std::string const &nameOnWire) { return "ProxyListenerGenericIntDataNode.D." + nameOnWire + '.' + symbol; } class Data : public BroadcastMessage { private: Integer _value; public: Integer getValue() const { return _value; } Data(std::string const &symbol, std::string const &nameOnWire, Integer value); }; Integer _value; ProxyListenerGenericIntDataNode(DataNodeArgument const &args); friend class GenericDataNodeFactory; protected: void onBroadcast(BroadcastMessage &message, int msgId); public: virtual void getInteger(bool &valid, Integer &value) const; static void sendData(DataNodeManager *manager, std::string const &symbol, std::string const &nameOnWire, Integer &value); static void storeFactory(std::string const &wireName, std::string const &requestType, std::string const &factoryName); }; ProxyListenerGenericIntDataNode::Data::Data(std::string const &symbol, std::string const &nameOnWire, Integer value) : BroadcastMessage(getDataChannel(symbol, nameOnWire), 0), _value(value) { } ProxyListenerGenericIntDataNode::ProxyListenerGenericIntDataNode (DataNodeArgument const &args) : _value(0) { DataNodeArgumentVector const &argList = args.getListValue(); assert(argList.size() == 3); // Symbol, Name on wire, request type std::string const &symbol = argList[0].getStringValue(); std::string const &nameOnWire = argList[1].getStringValue(); std::string const &requestType = argList[2].getStringValue(); registerForBroadcast(getDataChannel(symbol, nameOnWire), 0); ProxySubscriptionDataNode::findAndRequest(requestType, symbol); TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__<<"Listening for"<(message); _value = data.getValue(); notifyListeners(); } void ProxyListenerGenericIntDataNode::getInteger(bool &valid, Integer &value) const { // For simplicity we always say that its valid. The way this is typically // used, value=0 and invalid are interchangable. Obviously that wouldn't // work for all data fields, but it seems good enough for the imbalances // and the put/call volume. valid = true; value = _value; } void ProxyListenerGenericIntDataNode::sendData(DataNodeManager *manager, std::string const &symbol, std::string const &nameOnWire, Integer &value) { Data *message = new Data(symbol, nameOnWire, value); message->send(manager); } void ProxyListenerGenericIntDataNode::storeFactory(std::string const &wireName, std::string const &requestType, std::string const &factoryName) { DataNodeArgument factory = GenericDataNodeFactory::create< ProxyListenerGenericIntDataNode > (argList(symbolPlaceholderObject, wireName, requestType)); GenericDataNodeFactory::storeFactory(factoryName, factory); } ///////////////////////////////////////////////////////////////////// // ProxyListenerGenericDoubleDataNode ///////////////////////////////////////////////////////////////////// class ProxyListenerGenericDoubleDataNode : public GenericDataNode { private: static std::string getDataChannel(std::string const &symbol, std::string const &nameOnWire) { return "ProxyListenerGenericDoubleDataNode.D." + nameOnWire + '.' + symbol; } class Data : public BroadcastMessage { private: double _value; public: double getValue() const { return _value; } Data(std::string const &symbol, std::string const &nameOnWire, double value); }; bool _zeroIsNullMode; double _value; ProxyListenerGenericDoubleDataNode(DataNodeArgument const &args); friend class GenericDataNodeFactory; protected: void onBroadcast(BroadcastMessage &message, int msgId); public: virtual void getDouble(bool &valid, double &value) const; static void sendData(DataNodeManager *manager, std::string const &symbol, std::string const &nameOnWire, double &value); static void storeFactory(std::string const &wireName, std::string const &requestType, std::string const &factoryName); }; ProxyListenerGenericDoubleDataNode::Data::Data(std::string const &symbol, std::string const &nameOnWire, double value) : BroadcastMessage(getDataChannel(symbol, nameOnWire), 0), _value(value) { } ProxyListenerGenericDoubleDataNode::ProxyListenerGenericDoubleDataNode (DataNodeArgument const &args) : _value(0) { DataNodeArgumentVector const &argList = args.getListValue(); assert(argList.size() == 3); // Symbol, Name on wire, request type std::string const &symbol = argList[0].getStringValue(); std::string const &nameOnWire = argList[1].getStringValue(); std::string const &requestType = argList[2].getStringValue(); ProxySubscriptionDataNode::findAndRequest(requestType, symbol); registerForBroadcast(getDataChannel(symbol, nameOnWire), 0); TclList msg; // added to allow limit values to be NULL if ((nameOnWire == "WireLimitDownData") || (nameOnWire == "WireLimitUpData")) { _zeroIsNullMode = true; } else { _zeroIsNullMode = false; } msg<<__FILE__<<__LINE__<<__FUNCTION__<<"Listening for"<(message); _value = data.getValue(); notifyListeners(); } void ProxyListenerGenericDoubleDataNode::getDouble(bool &valid, double &value) const { if (_zeroIsNullMode && _value < 0.00001) { valid = false; } else { valid = true; } value = _value; } void ProxyListenerGenericDoubleDataNode::sendData(DataNodeManager *manager, std::string const &symbol, std::string const &nameOnWire, double &value) { Data *message = new Data(symbol, nameOnWire, value); message->send(manager); } void ProxyListenerGenericDoubleDataNode::storeFactory(std::string const &wireName, std::string const &requestType, std::string const &factoryName) { DataNodeArgument factory = GenericDataNodeFactory::create< ProxyListenerGenericDoubleDataNode > (argList(symbolPlaceholderObject, wireName, requestType)); GenericDataNodeFactory::storeFactory(factoryName, factory); } ///////////////////////////////////////////////////////////////////// // ProxyListenerHaltDataNode ///////////////////////////////////////////////////////////////////// class ProxyListenerHaltDataNode : public GenericHaltDataNode { private: static const int bmData = bmAvailable; enum class VerboseDebug { No, Yes, Unknown }; static VerboseDebug _verboseDebug; static std::string getDataChannel(std::string const &symbol) { return "ProxyListenerHaltDataNode.D." + symbol; } class Data : public BroadcastMessage { private: time_t _currentTime; HaltData _data; public: time_t getCurrentTime() const { return _currentTime; } HaltData const &getData() const { return _data; } Data(std::string const &symbol, WireHalt const &wire); }; ProxyListenerHaltDataNode(DataNodeArgument const &args); friend class DataNode; protected: void onBroadcast(BroadcastMessage &message, int msgId); public: static DataNodeLink *find(DataNodeListener *listener, int msgId, GenericHaltDataNode *&node, std::string const &symbol); static void sendData(DataNodeManager *manager, std::string const &symbol, WireHalt const &wire); static void sendInvalid(DataNodeManager *manager, std::string const &symbol); }; ProxyListenerHaltDataNode::VerboseDebug ProxyListenerHaltDataNode::_verboseDebug = VerboseDebug::Unknown; ProxyListenerHaltDataNode::Data::Data(std::string const &symbol, WireHalt const &wire) : BroadcastMessage(getDataChannel(symbol), 0) { _data.time = wire.time; _data.type = wire.type; _data.reason = wire.reason; _data.price = wire.price; } void ProxyListenerHaltDataNode::sendData(DataNodeManager *manager, std::string const &symbol, WireHalt const &wire) { if (_verboseDebug == VerboseDebug::Yes) ThreadMonitor::find().increment("Halt_sendData " + symbol); Data *message = new Data(symbol, wire); TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <getData().dump(); sendToLogFile(msg); message->send(manager); } DataNodeLink *ProxyListenerHaltDataNode::find(DataNodeListener *listener, int msgId, GenericHaltDataNode *&node, std::string const &symbol) { ProxyListenerHaltDataNode *tempNode; DataNodeLink *result = findHelper(listener, msgId, tempNode, symbol); node = tempNode; return result; } ProxyListenerHaltDataNode::ProxyListenerHaltDataNode(DataNodeArgument const &args) : GenericHaltDataNode(args) { if (_verboseDebug == VerboseDebug::Unknown) { if (getConfigItem("proxy_data_verbose_debug") == "1") { _verboseDebug = VerboseDebug::Yes; sendToLogFile(TclList()<(message); _last = data.getData(); //if (data.getCurrentTime()) // SynchronizedTimer::safeSetTime(data.getCurrentTime()); notifyListeners(); break; } default: GenericHaltDataNode::onBroadcast(message, msgId); } } ///////////////////////////////////////////////////////////////////// // ProxyListener ///////////////////////////////////////////////////////////////////// class ProxyListener : public RequestListener { private: DataNodeManager *const _manager; enum { mtL1Invalid, mtTosInvalid, mtL1Data, mtTosData, mtIntData, mtDoubleData, mtHaltData }; void listenForInt(std::string const &wireName); void listenForDouble(std::string const &wireName); virtual void newRequest(Request *request); public: ProxyListener(DataNodeManager *manager); }; void ProxyListener::newRequest(Request *request) { switch (request->callbackId) { case mtL1Invalid: { ExternalRequest ¤t = dynamic_cast< ExternalRequest & >(*request); ProxyListenerL1DataNode::sendInvalid(_manager, current.getProperty(WireSymbol)); //TclList msg; //msg<<__FILE__<<__LINE__<<__FUNCTION__ // <<"mtL1Invalid"<(*request); ProxyListenerTosDataNode::sendInvalid(_manager, current.getProperty(WireSymbol)); //TclList msg; //msg<<__FILE__<<__LINE__<<__FUNCTION__ // <<"mtTosInvalid"<(*request); const std::string symbol = current.getProperty(WireSymbol); const std::string body = current.getProperty(WireBody); if (body.size() != sizeof(WireL1)) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"mtL1Data"<(*request); const std::string symbol = current.getProperty(WireSymbol); const std::string body = current.getProperty(WireBody); if (body.size() != sizeof(WireTos)) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"mtTosData"<(*request); const std::string symbol = current.getProperty(WireSymbol); const std::string body = current.getProperty(WireBody); const std::string nameOnWire = current.getCommand(); if (body.size() != sizeof(DataNode::Integer)) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"mtIntData"<(*request); const std::string symbol = current.getProperty(WireSymbol); const std::string body = current.getProperty(WireBody); const std::string nameOnWire = current.getCommand(); if (body.size() != sizeof(double)) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"mtDoubleData"<(*request); const std::string symbol = current.getProperty(WireSymbol); const std::string body = current.getProperty(WireBody); if (body.size() != sizeof(WireHalt)) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"mtHaltData"<listenForCommand(wireName, this, mtIntData); } void ProxyListener::listenForDouble(std::string const &wireName) { CommandDispatcher::getInstance() ->listenForCommand(wireName, this, mtDoubleData); } ProxyListener::ProxyListener(DataNodeManager *manager) : _manager(manager) { CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand(WireTosInvalid, this, mtTosInvalid); cd->listenForCommand(WireL1Invalid, this, mtL1Invalid); cd->listenForCommand(WireTosData, this, mtTosData); cd->listenForCommand(WireL1Data, this, mtL1Data); cd->listenForCommand(WireHaltData, this, mtHaltData); listenForInt(WireImbalanceData); listenForInt(WirePutVolumeData); listenForInt(WireCallVolumeData); listenForDouble(WireNyseBidData); listenForDouble(WireNyseAskData); listenForDouble(WireLimitUpData); listenForDouble(WireLimitDownData); } ///////////////////////////////////////////////////////////////////// // Global ///////////////////////////////////////////////////////////////////// void initProxyData(DataNodeManager *manager) { new ProxyListener(manager); GenericL1DataNode::registerImplementation(ProxyListenerL1DataNode::find); GenericTosDataNode::registerImplementation(ProxyListenerTosDataNode::find); GenericHaltDataNode::registerImplementation(ProxyListenerHaltDataNode::find); ProxyListenerGenericIntDataNode::storeFactory(WireImbalanceData, ProxySubscriptionDataNode::L1, "NyseImbalance"); ProxyListenerGenericIntDataNode::storeFactory(WirePutVolumeData, ProxySubscriptionDataNode::L1, "PutVolume"); ProxyListenerGenericIntDataNode::storeFactory(WireCallVolumeData, ProxySubscriptionDataNode::L1, "CallVolume"); ProxyListenerGenericDoubleDataNode::storeFactory(WireLimitUpData, ProxySubscriptionDataNode::L1, "LimitUp"); ProxyListenerGenericDoubleDataNode::storeFactory(WireLimitDownData, ProxySubscriptionDataNode::L1, "LimitDown"); ProxyListenerGenericDoubleDataNode::storeFactory (WireNyseBidData, ProxySubscriptionDataNode::REGIONAL, "NyseBid"); ProxyListenerGenericDoubleDataNode::storeFactory (WireNyseAskData, ProxySubscriptionDataNode::REGIONAL, "NyseAsk"); } void initProxyDataInThread() { initProxyData(DataNodeManager::getDefault()); ProxySubscriptionDataNode::init(); }