#include "../shared/Messages.h" #include "../shared/ThreadClass.h" #include "../shared/CommandDispatcher.h" #include "../shared/LogFile.h" #include "../shared/ReplyToClient.h" #include "GetData.h" #include "Subscriptions.h" ///////////////////////////////////////////////////////////////////// // VfForSymbol ///////////////////////////////////////////////////////////////////// class VfForSymbol { private: struct Fields { int64_t buyVolume; int64_t sellVolume; int64_t expectedVelocity; }; std::string _packed; public: void init(std::string const &symbol); void update(int64_t buyVolume, int64_t sellVolume, int64_t expectedVelocity); void load(VFMessage *message); std::string const &binaryMessage() const; std::string textMessage(bool verbose) const; }; void VfForSymbol::load(VFMessage *message) { init(message->symbol); update(message->buyVolume, message->sellVolume, message->expectedVelocity); } void VfForSymbol::init(std::string const &symbol) { if (_packed.empty()) _packed = std::string(sizeof(Fields), '\0') + symbol; } void VfForSymbol::update(int64_t buyVolume, int64_t sellVolume, int64_t expectedVelocity) { Fields *fields = (Fields *)&_packed[0]; fields->buyVolume = buyVolume; fields->sellVolume = sellVolume; fields->expectedVelocity = expectedVelocity; } std::string const &VfForSymbol::binaryMessage() const { return _packed; } std::string VfForSymbol::textMessage(bool verbose) const { Fields *fields = (Fields *)&_packed[0]; TclList result; result<<"Buy Volume"<buyVolume <<"Sell Volume"<sellVolume <<"Expected Velocity"<expectedVelocity; if (verbose) result<<"Internal"<<_packed; return result; } ///////////////////////////////////////////////////////////////////// // SubscriptionsThread ///////////////////////////////////////////////////////////////////// class SubscriptionsThread : private ThreadClass { private: enum { mtNewValue, mtDumpData, mtDumpListeners, mtSnapshot, mtListen, mtAdd, mtRemove, mtQuit }; RequestQueue _incomingRequests; protected: void threadFunction(); public: SubscriptionsThread(); ~SubscriptionsThread(); void addData(VFMessage *message); static SubscriptionsThread *instance; }; SubscriptionsThread *SubscriptionsThread::instance; void SubscriptionsThread::addData(VFMessage *message) { message->callbackId = mtNewValue; _incomingRequests.newRequest(message); } void SubscriptionsThread::threadFunction() { ThreadMonitor &tm = ThreadMonitor::find(); initGetData(); typedef std::map< SocketInfo *, int > ListenerChannel; ListenerChannel listenerChannel; typedef std::map< std::string, VfForSymbol > Values; Values values; typedef std::pair< std::string, SocketInfo * > Listener; typedef std::set< Listener > Listeners; Listeners listeners; while (true) { while (Request *current = _incomingRequests.getRequest()) { switch (current->callbackId) { case mtNewValue: { static const std::string S_NEW_VALUE = "NEW VALUE"; tm.setState(S_NEW_VALUE); VFMessage *request = dynamic_cast< VFMessage * >(current); std::string const &symbol = request->symbol; VfForSymbol &value = values[symbol]; value.load(request); for (Listeners::const_iterator it = listeners.lower_bound(Listener(symbol, NULL)); (it != listeners.end()) && (it->first == symbol); it++) { SocketInfo *const socket = it->second; addToOutputQueue(socket, value.binaryMessage(), listenerChannel[socket]); } break; } case mtDumpListeners: { static const std::string S_DUMP_LISTENERS = "DUMP LISTENERS"; tm.setState(S_DUMP_LISTENERS); ExternalRequest *request = dynamic_cast(current); for (ListenerChannel::const_iterator it = listenerChannel.begin(); it != listenerChannel.end(); it++) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"listenerChannel"<second; LogFile::primary().sendString(msg, it->first); } for (Listeners::const_iterator it = listeners.begin(); it != listeners.end(); it++) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"listening"<first; LogFile::primary().sendString(msg, it->second); } addToOutputQueue(request->getSocketInfo(), "Done", request->getResponseMessageId()); break; } case mtDumpData: { static const std::string S_DUMP_DATA = "DUMP DATA"; tm.setState(S_DUMP_DATA); ExternalRequest *request = dynamic_cast(current); const bool verbose = request->getProperty("verbose") == "1"; for (Values::const_iterator it = values.begin(); it != values.end(); it++) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <first <second.textMessage(verbose); LogFile::primary().sendString(msg); } addToOutputQueue(request->getSocketInfo(), "Done", request->getResponseMessageId()); break; } case mtSnapshot: { static const std::string S_SNAPSHOT = "SNAPSHOT"; tm.setState(S_SNAPSHOT); ExternalRequest *request = dynamic_cast(current); const std::string symbol = request->getProperty("symbol"); const bool verbose = request->getProperty("verbose") == "1"; std::string message = symbol + ": "; if (values.count(symbol)) message += values[symbol].textMessage(verbose); else message += "Not found"; addToOutputQueue(request->getSocketInfo(), message, request->getResponseMessageId()); break; } case mtListen: { static const std::string S_LISTEN = "LISTEN"; tm.setState(S_LISTEN); ExternalRequest *request = dynamic_cast(current); listenerChannel[request->getSocketInfo()] = request->getResponseMessageId(); break; } case mtAdd: { static const std::string S_ADD = "ADD"; tm.setState(S_ADD); ExternalRequest *request = dynamic_cast(current); const std::string symbol = request->getProperty("symbol"); listeners.insert(Listener(symbol, request->getSocketInfo())); if (VfForSymbol const *value = getProperty(values, symbol)) { SocketInfo *const socket = request->getSocketInfo(); addToOutputQueue(socket, value->binaryMessage(), listenerChannel[socket]); } break; } case mtRemove: { static const std::string S_REMOVE = "REMOVE"; tm.setState(S_REMOVE); ExternalRequest *request = dynamic_cast(current); const std::string symbol = request->getProperty("symbol"); listeners.erase(Listener(symbol, request->getSocketInfo())); break; } case mtQuit: { delete current; return; } case DeleteSocketThread::callbackId: { static const std::string S_DELETE_SOCKET = "DELETE SOCKET"; tm.setState(S_DELETE_SOCKET); SocketInfo *socket = current->getSocketInfo(); listenerChannel.erase(socket); for (Listeners::iterator it = listeners.begin(); it != listeners.end(); ) { Listeners::iterator next = it; next++; if (it->second == socket) listeners.erase(it); it = next; } break; } } delete current; } _incomingRequests.waitForRequest(); } } SubscriptionsThread::SubscriptionsThread() : ThreadClass("SubscriptionsThread"), _incomingRequests(getName()) { CommandDispatcher *dispatcher = CommandDispatcher::getInstance(); // These are only for debugging and development. They take no arguments. // They report the internal state to the log file (because it might be // too long to see on the screen). They print a confirmation on the screen // so you know you typed it right. dump_data has one option: verbose=1 // means to include the computer friendly format, not just the user friendly // format. dispatcher->listenForCommand("dump_data", &_incomingRequests, mtDumpData); dispatcher->listenForCommand("dump_listeners", &_incomingRequests, mtDumpListeners); // This is only for debugging. It displays a user friendly dump of the // data for the given symbol. The argument is passed as symbol=DELL. // verbose=1 means to include the computer friendly format, not just the user //friendly format. This will display something, even if the symbol is not // valid. dispatcher->listenForCommand("snapshot", &_incomingRequests, mtSnapshot); // This is how you set the channel for all subscriptions. It works just like // the oddsmaker and top list channels. You open it once and listen to it // forever. You make specific requests with other commands. Aside from // message_id there are no arguments. dispatcher->listenForCommand("listen", &_incomingRequests, mtListen); // This subscribes to a symbol. The argument is passed as symbol=DELL. This // will not display any result. All results will come back from the listen // command. This will request an immediate snapshot, in attion to any and // all updates. A duplicate call to add will request another snapshot, but // is otherwise ignored. dispatcher->listenForCommand("add", &_incomingRequests, mtAdd); // This unsubscribes. A duplicate request to unsubscribe is ignored. There // are no responses from this command. Send the symbol as symbol=DELL dispatcher->listenForCommand("remove", &_incomingRequests, mtRemove); // Also, you can use the standard ping command, from ../shared/Ping.[Ch] startThread(); } SubscriptionsThread::~SubscriptionsThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incomingRequests.newRequest(r); waitForThread(); } ///////////////////////////////////////////////////////////////////// // VFMessage ///////////////////////////////////////////////////////////////////// VFMessage::VFMessage() : Request(NULL), buyVolume(0), sellVolume(0), expectedVelocity(0) { } void VFMessage::send() { SubscriptionsThread::instance->addData(this); } ///////////////////////////////////////////////////////////////////// // Initialization ///////////////////////////////////////////////////////////////////// void initSubscriptions() { assert(!SubscriptionsThread::instance); SubscriptionsThread::instance = new SubscriptionsThread(); }