#include "../shared/CommandDispatcher.h" #include "../shared/LogFile.h" #include "../shared/ThreadMonitor.h" #include "../shared/ReplyToClient.h" #include "../fast_alert_search/history_server/Marshal.h" #include "ProxyThread.h" /* TODO / new plan: Should we mark the snapshot differently that the normal data? For L1 data we don’t really care. For TOS data it is important. Reconnecting will give us a lot of snapshots, and we don’t want to confuse that with a lot of prints. In the old proxy we had a trick that was very specific to the TOS data. There is a boolean field in the data that says whether or not this is a new print. Initially we get that from the external data feed. That’s what we send to any active listeners. Then we change that one field to always say that this is not a new print, and we store that in the cache in case anyone asks for a snapshot. It seems like we could do something similar. We accept one or two inputs for the new data. The first input is what to send to any active listeners. The second is what we want to send to the cache. Either or both can be “” to say don’t send anything. (“” is reserved. We never actually send this to a listener as a streaming or snapshot result.) We offer a way to make both strings the same. That will be convenient for the programmer and reasonably efficient. Because we are passing this data between threads, we can’t just share a pointer. But we were already considering using a reference counted pointer for the data, because it might be long and it might get copied several times. That would make it easy and efficient to store two copies of the smart pointer in any messages between threads. Even better, “” no longer has to be a special case. We can use the null pointer to say no data. This might be slightly less efficient than the ideal case, but it’s easy and it makes this part of the code very generic and reusable. It seems like a reasonable compromise. The symbol would probably remain a normal string, not a reference counted pointer. We expect these to be short and the new std::string automatically optimizes for that case. */ ///////////////////////////////////////////////////////////////////// // ProxyThread::CancelInfo ///////////////////////////////////////////////////////////////////// bool ProxyThread::CancelInfo::operator ==(CancelInfo const &other) const { return (_socketInfo == other._socketInfo) && (_type == other._type) && (_symbol == other._symbol); } bool ProxyThread::CancelInfo::operator <(CancelInfo const &other) const { if (_socketInfo < other._socketInfo) return true; if (_socketInfo > other._socketInfo) return false; if (_type < other._type) return true; if (_type > other._type) return false; return _symbol < other._symbol; } std::string ProxyThread::CancelInfo::debugDump() const { TclList result; result<<"_socketInfo"<listenForCommand("request_from_proxy", this, mtRequestFromProxy); cd->listenForCommand("cancel_from_proxy", this, mtCancelFromProxy); cd->listenForCommand("debug_dump_from_proxy", this, mtDebugDumpFromProxy); start(); } void ProxyThread::initialize() { instance(); } void ProxyThread::beforeSleep(IBeforeSleepCallbacks &callbacks) { if (Request *request = _lowPriority.pop()) { dynamic_cast< DoIt * >(request)->action(); if (!_lowPriority.empty()) callbacks.wakeAfterMs(0); } } void ProxyThread::socketClosed(SocketInfo *socket) { _lowPriority.remove(socket); auto it = _requestsByCancelInfo.lower_bound(socket); while (true) { if (it == _requestsByCancelInfo.end()) // Past the end of the list. break; if (it->getSocketInfo() != socket) // We've already seen all requests for this socket. break; const bool found = eraseFromRequestsByContent(it->getType(), it->getSymbol(), it->getSocketInfo()); assert(found); it = _requestsByCancelInfo.erase(it); } }; bool ProxyThread::eraseFromRequestsByContent(DataTypes::Internal type, std::string const &symbol, SocketInfo *socket) { auto &bySymbol = _requestsByContent[type]; const auto it = bySymbol.find(symbol); if (it == bySymbol.end()) return false; // No requests for this symbol. Nothing to delete. const bool somethingErased = it->second.erase(socket); if (somethingErased && it->second.empty()) bySymbol.erase(it); // Don't leave a symbol pointing to an empy hashtable. return somethingErased; } void ProxyThread::handleRequestInThread(Request *original) { SocketInfo *const socket = original->getSocketInfo(); switch (original->callbackId) { case mtRequestFromProxy: { ExternalRequest &request = dynamic_cast< ExternalRequest & >(*original); ExternalRequest::MessageId messageId = request.getResponseMessageId(); if (messageId.isEmpty()) // Client error? We don't always go out of our way to check for this, // but in general the rule is that message id == 0 means the client // doesn't want a reply. And in this case the only point of the // request is to get a reply. return; // Type would be something like "TOS" or "L1" or "Halts". This unit // treats all types of data the same. Some people publish certain // types of data and others listen for certain types of data, that's all. const DataTypes::Internal type = DataTypes::fromString(request.getProperty("type"), false); if (type == DataTypes::UNKNOWN) { // The remote request was invalid. Log it and otherwise ignore it. ThreadMonitor::find().increment("UNKNOWN REQUEST TYPE"); return; } // Set this to "MSFT" to only see data for Microsoft. Skip it or set it // to "" to request data for all symbols. The details will be different // depending which you choose. We send the symbol with each record if // you ask for all symbols, but not if you ask for a specific symbol. // There may be other differences. std::string const &symbol = request.getProperty("symbol"); // Snapshot="1" means start by sending the current state. Snapshot="0" // means only show the stream of changes. (Currently all requests show // you the stream of changes.) By default we assume you want the // snapshot if and only if you are asking about a specific symbol; that's // based on how we use the older market data proxy. bool snapshot = request.getProperty("snapshot", !symbol.empty()); // Sometimes we have an option to display the output in a human readable // form, for debugging and development. That seems like a problem here // when we will have a variety of data types. There is no standard // interface which includes debugDump() and marshal(). For simplicity // all data should already be marshalled before it gets to this class. //std::string const &outputFormat = request.getProperty("output_format"); _lowPriority.push(new DoIt(socket, [=]() { requestFromProxy(socket, messageId, type, symbol, snapshot); })); break; } case mtCancelFromProxy: { ExternalRequest &request = dynamic_cast< ExternalRequest & >(*original); const DataTypes::Internal type = DataTypes::fromString(request.getProperty("type"), false); if (type == DataTypes::UNKNOWN) { // The remote request was invalid. Log it and otherwise ignore it. ThreadMonitor::find().increment("UNKNOWN REQUEST TYPE"); return; } std::string const &symbol = request.getProperty("symbol"); // Notice that the pair (type, symbol) is the key we use to remove a // request. That means that the client can't request the exact same // data twice at the same time. That seems reasonable for performance // reasons and it matches the way our clients normally work. For // examples of an alternative, look at the way TI Pro requests a top list // or alert strategy. In that case TI Pro makes up a unique key for each // request. _lowPriority.push(new DoIt(socket, [=]() { cancelFromProxy(socket, type, symbol); })); break; } case mtDebugDumpFromProxy: { ExternalRequest &request = dynamic_cast< ExternalRequest & >(*original); TclList msg; msg<<"request socket" <second, messageId); } } } void ProxyThread::cancelFromProxy(SocketInfo *socket, DataTypes::Internal type, std::string const &symbol) { CancelInfo cancelInfo(socket, type, symbol); _requestsByCancelInfo.erase(cancelInfo); eraseFromRequestsByContent(type, symbol, socket); } // Note: Normally I send a string to a function using "std::string const &". // However, that sometimes causes problems when using [=]. I'm trying to // CAPTURE a COPY of the string, but if the input to the function is just a // reference, I might actually be making a copy of a reference. I.e. I'll be // saving a pointing to a temporary object. I don't remember all the details, // so for now I'm going to do this the way I know is safe, even though that // means one extra copy of each string. void ProxyThread::addData(DataTypes::Internal type, std::string symbol, BigData streaming, BigData cached) { const auto i = instance(); i->doWorkInThread([=]() { i->addDataImpl(type, symbol, streaming, cached); }); } /* void ProxyThread::clearAllData(DataTypes::Internal type) { const auto i = instance(); i->doWorkInThread([=]() { i->clearAllDataImpl(type); }); } */ const ProxyThread::BigData ProxyThread::EMPTY_STRING = new std::string; void ProxyThread::addDataImpl(DataTypes::Internal type, std::string const &symbol, BigData const &streaming, BigData const &cached) { auto &bySymbol = _snapshots[type]; if (symbol.empty()) { // All symbols. for (auto it = bySymbol.begin(); it != bySymbol.end(); ) { BigData &inCacheNow = it->second; const BigData previouslyCached = inCacheNow; const std::string currentSymbol = it->first; if (cached) { inCacheNow = cached; it++; } else it = bySymbol.erase(it); // Note the invariant: previouslyCached will not be null. if (streaming && (*streaming != *previouslyCached)) sendNotifications(type, currentSymbol, streaming); } } else { // Only this symbol BigData &inCacheNow = bySymbol[symbol]; const BigData previouslyCached = inCacheNow; if (cached) inCacheNow = cached; else bySymbol.erase(symbol); if (streaming && // We have something to say now ... ((!previouslyCached) || (*streaming != *previouslyCached))) // ... and it's different from what was in the cache. sendNotifications(type, symbol, streaming); } } void ProxyThread::sendNotifications(DataTypes::Internal type, std::string const &symbol, BigData const &data) { auto &bySymbol = _requestsByContent[type]; auto forSymbol = bySymbol.find(symbol); if (forSymbol != bySymbol.end()) for (auto const &kvp : forSymbol->second) addToOutputQueue(kvp.first, *data, kvp.second); forSymbol = bySymbol.find(""); if (forSymbol != bySymbol.end()) { const std::string toSend = addSymbol(symbol, data); for (auto const &kvp : forSymbol->second) addToOutputQueue(kvp.first, toSend, kvp.second); } }