#ifndef __ProxyThread_h_ #define __ProxyThread_h_ #include #include #include "DataTypes.h" #include "../shared/ContainerThread.h" #include "../shared/ThreadSafeRefCount.h" #include "../shared/CommandDispatcher.h" class ProxyThread : public ForeverThreadUser { public: // The messages might get copied a lot. And they might be long strings. So // store them in a reference counted pointer for efficiency. typedef TSRefCount< std::string > BigData; private: enum { mtRequestFromProxy, mtCancelFromProxy, mtDebugDumpFromProxy }; // Keep this private. In many classes we make this public, but we might want // to split up the work over multiple threads. So the public methods will // be static. static ProxyThread *instance(); // All requests to subscribe or unsubscribe are sent through here. // Forwarding data has the highest priority. If a server reconnects in the // middle of the day we don't want it to slow down the other servers. We've // seen that happen in a bad way with the Delphi market data proxy. // // A disconnect will unfortunately be a high priority event. We have to deal // with disconnects immediately or it can cause other problems. FairRequestQueue _lowPriority; // A generic request suitable for pushing into _lowPriority. // // This is a new style of request. When we first started std::function did // not exist, nor did lambda expressions. This new style makes the code a // little cleaner. The code that is now stored in the std::function/lambda // used to be put where "->action()" is now. We used a switch statement // to decide which code to run on which request. // // The older code might have been slightly more efficient. We knew that // we were making one copy of the data right in the Request object. That // data was all passed around as a single pointer and never copied. // Sometimes we use smart pointers to hold data in a lambda. class DoIt : public Request { public: DoIt(SocketInfo *socket) : Request(socket) { } std::function< void() > action; template < typename Action > DoIt(SocketInfo *socket, Action a) : Request(socket), action(a) { } }; virtual void handleRequestInThread(Request *original); virtual void socketClosed(SocketInfo *socket); virtual void beforeSleep(IBeforeSleepCallbacks &callbacks); // Marshal the symbol and the data together. When the client subscribes to // a single symbol, we only send the data. When the client subscribes to // multiple symbols in one request, we have to tag each response with the // symbol. First we marshal() the symbol in the normal way. Then we append // the data as is. Presumably it was already the response from a previous // call to marshal(), but it doesn't matter either way. static std::string addSymbol(std::string const &symbol, BigData const &data); // Sending data downstream. void requestFromProxy(SocketInfo *socket, ExternalRequest::MessageId messageId, DataTypes::Internal type, std::string const &symbol, bool snapshot); void cancelFromProxy(SocketInfo *socket, DataTypes::Internal type, std::string const &symbol); // A list of all requests. // // If we were storing this in a MySQL database the table would look something // like this: // CREATE TABLE listeners ( // BIGINT socket_info, // BIGINT message_id, // VARCHAR(…) symbol, // ENUM(“L1”, “TOS”, …) data_type, // UNIQUE KEY for_deleting_listeners (socket_info, data_type, symbol), // KEY for_distributing_data (data_type, symbol); class CancelInfo { private: SocketInfo *_socketInfo; DataTypes::Internal _type; std::string _symbol; public: // Default constructor. All fields are empty. See isEmpty() CancelInfo() : _socketInfo(NULL), _type(DataTypes::UNKNOWN) { } // Start from here to find all items for this socket. This would be a // perfect input to std::map::lower_bound() or std::map::upper_bound(). CancelInfo(SocketInfo *socketInfo) : _socketInfo(socketInfo), _type(DataTypes::UNKNOWN) { } // A reasonable key to store in the table. CancelInfo(SocketInfo *socketInfo, DataTypes::Internal type, std::string const &symbol) : _socketInfo(socketInfo), _type(type), _symbol(symbol) { assert (socketInfo && (type != DataTypes::UNKNOWN)); } SocketInfo *getSocketInfo() const { return _socketInfo; } DataTypes::Internal getType() const { return _type; } std::string const &getSymbol() const { return _symbol; } bool isEmpty() const { return _type == DataTypes::UNKNOWN; } bool operator ==(CancelInfo const &other) const; bool operator <(CancelInfo const &other) const; std::string debugDump() const; }; std::set< CancelInfo > _requestsByCancelInfo; // When we get new data from upstream, this is how we find out who we have // to forward it to downstream. There are two types of request. If the // string is "" then we send all data to that destination. Otherwise the // string lists a specific symbol. // There are two ways to look up values: // 1) You know the data type and the symbol, so you get a list of items. // Each item is a SocketInfo * and an ExternalRequest::MessageId. Use // This when we receive data. // 2) You know the data type, the symbol and the socket. Use this to cancel // a request. Note that you only have to delete the matching item. If // we used std::map< std::string, map< SocketInfo *, // ExternalRequest::MessageId > > some things might be more obvious, but // when it's time to delete an item you'd have to check for a symbol // that was pointing to 0 requests. If you didn't delete those you could // have a memory leak. DataTypes::Container< std::unordered_map< std::string, std::unordered_map< SocketInfo *, ExternalRequest::MessageId > > > _requestsByContent; bool eraseFromRequestsByContent(DataTypes::Internal type, std::string const &symbol, SocketInfo *socket); // Receiving data from upstream. void addDataImpl(DataTypes::Internal type, std::string const &symbol, BigData const &streaming, BigData const &cached); void sendNotifications(DataTypes::Internal type, std::string const &symbol, BigData const &streaming); // A snapshot of all known data. Often when people first connect they want // to know the most recent version of this, plus whatever streaming data // comes next. DataTypes::Container< std::unordered_map< std::string, BigData > > _snapshots; ProxyThread(); public: // Thread safe and may be called multiple times. static void initialize(); // These are thread safe. // // In general you have 3 choices: // 1) You expose a RequestListener (a queue or something that looks like a // queue) and that is thread safe. That works well when the producer // (e.g. a CommandDispatcher) always outputs in a standard format. // 2) You say that the entire class is NOT thread safe. That's helpful when // you need an immediate response to functions. That's often used in // classes where you can make one or more instances of the class per // thread, like TalkWithServer64. NewWorkerCluster uses this option so it // can take care of the boilerplate code needed to move requests AND // RESPONSES to the correct thread, making that class easier to user. // 3) (Used here) All public methods are thread safe. Under the hood these // public methods will take care of putting the request into a queue. // Older code will do that explicitly, but newer code will use the // ContainerThread API which takes care of a lot of the details for you. static void addData(DataTypes::Internal type, std::string symbol, BigData streaming, BigData cached); static void addData(DataTypes::Internal type, std::string const &symbol, std::string const &streaming, std::string const &cached) { addData(type, symbol, new std::string(streaming), new std::string(cached)); } static void addData(DataTypes::Internal type, std::string const &symbol, std::string const &data) { BigData bigData(new std::string(data)); addData(type, symbol, bigData, bigData); } // EMPTY_STRING = "". The convention is that we send "" to any streaming // listeners, but anyone who asks for the cached value for this gets nothing. static const BigData EMPTY_STRING; static void clearData(DataTypes::Internal type, std::string const &symbol) { addData(type, symbol, EMPTY_STRING, NULL); } // For simplicity we say that "" in addData means the same thing as when // subscribing to data. Iterate over all symbols in the cache. static void clearAllData(DataTypes::Internal type) { clearData(type, ""); } }; #endif