#include #include "../shared/MiscSupport.h" #include "../shared/ContainerThread.h" #include "../shared/SimpleLogFile.h" #include "../generate_alerts/data_framework/GenericTosData.h" #include "../new_market_data_proxy/DataTypes.h" #include "../new_market_data_proxy/MarketDataProxyClient.h" #include "../new_market_data_proxy/WireFormats.h" /* Copy data from MarketDataProxyClient to GenericTosData. Requests go in the * other diretion. * * MarketDataProxyClient is focused on sending data from one program to * another. GenericTosData is focused on making data available within a * program. Both are well defined interfaces so we can reuse them a lot. * * Notice that the DataNode base class and the find() method will consolidate * requests. If the program requests TOS data for MSFT 100 times, we will * create only one NewProxyData::TosDataNode and send only one request via * MarketDataProxyClient to the upstream server. * * MarketDataProxyClient automatically creates its own thread. We get * callbacks in that thread. And we have one thread associated with * DataNode.h. We use DataNodeManager::addToQueue() to send work to the * DataNode thread and report the new event in that thread. * * In some places we use a BroadcastMessage to send the data from other threads * to the DataNode thread. That is great when each individual object makes * its own requests, and gets the responses directly. However, * MarketDataProxyClient.h has only one stream of output. We have to move that * data to the DataNode thread first, then we decide how to route it. This is * common for data providers and is not a good match for BroadcastMessage. */ namespace NewProxyData { class TosDataNode : public GenericTosDataNode { private: static std::unordered_map< std::string, TosDataNode * > _all; void fromUpStream(std::string const &encodedData) { if (!encodedData.empty()) try { unmarshal(encodedData, _last); _valid = true; notifyListeners(); } catch (MarshallingException &m) { TclList msg; msg<subscribe(DataTypes::TOS, symbol()); TclList msg; msg<unsubscribe(DataTypes::TOS, symbol()); _all.erase(symbol()); } friend class DataNode; public: static DataNodeLink *find(DataNodeListener *listener, int msgId, GenericTosDataNode *&node, std::string const &symbol) { TosDataNode *tempNode; DataNodeLink *result = findHelper(listener, msgId, tempNode, symbol); node = tempNode; return result; } static void fromUpSteam(std::string const &symbol, std::string const &encodedData) { const auto it = _all.find(symbol); if (it != _all.end()) it->second->fromUpStream(encodedData); } }; std::unordered_map< std::string, TosDataNode * > TosDataNode::_all; static void fromUpSteam(DataTypes::Internal type, std::string const &symbol, std::string const &encodedData) { switch (type) { case DataTypes::TOS: TosDataNode::fromUpSteam(symbol, encodedData); break; default: TclList msg; msg<setCallback(fromUpSteam); GenericTosDataNode::registerImplementation(TosDataNode::find); } } void initNewProxyData() { NewProxyData::init(); }