#ifndef __DataNodes_h_ #define __DataNodes_h_ #include #include #include #include #include #include #include #include "../../shared/MiscSupport.h" #include "../../shared/Messages.h" #include "Timers.h" // Anyone who listens to a data node must implement this callback. In Delphi // we used a TThreadMethod for the callback. In C++, instead, we offer an // object of this type, and a messageId. When appropriate we will call // onWakeup on the given object with the given msgId. The msgId makes it easy // for one object to listen for messages from multiple data nodes. This should // allow us to keep the general feel of the original Delphi code. class DataNodeListener { public: virtual void onWakeup(int msgId) =0; // We add this to avoid a compiler warning. The down side is that it is // almost impossible for a class to inherit from this class without // publicly inheriting from this class. virtual ~DataNodeListener() { } }; class DataNode; class BroadcastMessage; // This serves the same general purpose as DataNodeListener. The format is // slightly different because there is a broadcast message as the payload. // Presumably only datanodes will need to be broadcast listeners. But it // seemed like some parts of the code would be simpler if I explicitly created // this simple interface. class BroadcastMessageListener { public: virtual void onBroadcast(BroadcastMessage &message, int msgId) =0; virtual ~BroadcastMessageListener() { } }; class BroadcastRequest { public: std::string _channel; BroadcastMessageListener *_caller; int _id; bool operator ==(BroadcastRequest const &other) const; bool operator <(BroadcastRequest const &other) const; BroadcastRequest(std::string channel, BroadcastMessageListener *caller, int id) : _channel(channel), _caller(caller), _id(id) { } }; class DataNodeManager : NoCopy, NoAssign { private: RequestListener * const _queue; const int _callbackId; TimerThread _timerThread; std::set< BroadcastRequest > _broadcastRequests; std::map< std::string, DataNode * > _registered; std::map< DataNode *, std::string > _registeredByNode; time_t _submitTime; public: class EventQueueListener : public Request { // TO DO: This object should call setEventTime. public: virtual void onEventQueue() =0; EventQueueListener(SocketInfo *socket = NULL) : Request(socket) { } }; DataNodeManager(RequestListener *queue, int callbackId); // You can call this from another thread. In fact, you will probably // call this from another thread. You can also use this to break up big // jobs into smaller jobs, but you don't want to do that too much or things // will become less effecient. Everything else in this unit is limited to // one thread. Although you can create multiple data node managers in // different queues. void addToQueue(EventQueueListener *listener); // A wrapper around addToQueue() and EventQueueListener. This is the simple // way to execute a lambda (or similar) in the DataNode thread. socket has // it's normal meaning. If the socket gets deleted then the request might // get deleted before it is executed. void addToQueue(std::function< void() > const &action, SocketInfo *socket = NULL); // For simplicity of implementation we only allow one copy of listener, // channel and msgId at a time. A duplicate request may be ignored, or it // may cause an error. Don't do it. void subscribe(BroadcastRequest request); // This is safe even if you are not subscribed. void unsubscribe(BroadcastRequest request); void broadcastNow(BroadcastMessage *message); // This tells you which DataNodeManager is active in this thread. This is // aimed at implementors in this thread. If you are trying to send a // message, you are probably sending to another thread, and you need another // way to find the appropriate data node manager. static DataNodeManager *getDefault(); // This is how we store the DataNodeManager, for use in the preceding // function. Presumably we will create the DataNodeManager in the // constructor for a thread, so the DataNodeManager will immediately be // available to other threads. You will call the following function shortly // after the thread starts. void setAsDefault(); static std::string nextOwnerChannel(); // This is aimed at understanding performance. The current implementation // counts the total number of data nodes ever created, not the ones that // currently exist. That's typically the same. static int64_t dataNodeCount(); void registerNode(std::string const &name, DataNode *node); void unregisterNode(DataNode *node); // This will only find the data node if it has been registered. Otherwise // it will return NULL. This should only be called by a data node or data // node factory. Those will take care of any necessary linking. DataNode *findDataNode(std::string const &name); // This is the official time of the event. This is when we received the // event. If it sits in a queue before we process it, we still use this // time. We use the same time everywhere to be consistent. For example, // the timestamp you see on an alert comes from getSubmitTime(). // // This is always local time. Not datafeed time. // ../data_framework/SynchronizedTimers.[Ch] is the only serious place where // we really care about the time reported by the datafeed. That code tells // the candle alerts a little later if the data feed is behind. // // Example 1: We are running reasonably well. At 10:00:00am according to // our local clock we receive data from the datafeed. The data is also // marked 10:00:00. SynchronizedTimers notifies anyone who's interested // that it's time to check the end of candle alerts. Some alerts will fire, // all with the timestamp 10:00:00. // // Example 2: The datafeed is slow but the rest of the system is running // fast. Imagine it's 2004 and the internet is slower and we have a less // capable data provider. It's 10:00:03 according to our local clock. // We receive a new message from the data provider. The message is finally // up to 10:00:00. SynchronizedTimers notifies everyone who cares about // the end of candles. Some alerts will fire. These are alerts which // ideally should have fired at 10:00:00, but are firing now, at 10:00:03. // The timestamp on these alerts will be 10:00:03. // // Example 3: The datafeed is in good shape, but this process is running // slowly. Items from the datafeed are time stamped as soon as they come in, // but they might sit in a queue for up to 3 seconds. We start processing // a message in the alerts thread. Local time is 10:00:03. The print was // from 10:00:00, as is the timestamp reported by getSubmitTime(). We // report a lot of alerts because of this message. The user will see // 10:00:00 as the timestamp in TI Pro. // // That is to say, if the delay is within our server, or if the delay is // between our server and the client, it will look the same. The alert // will say 10:00:00 and it will contain data from 10:00:00. The user will // just see it 3 seconds late. And if the user is looking at history, he // won't see any problems. time_t getSubmitTime() const { return _submitTime; } // This should only be used by BroadcastMessage. void setSubmitTime(time_t value) { _submitTime = value; } TimerThread &getTimerThread() { return _timerThread; } void gracefulShutdown(); static std::string gracefulShutdownChannel() { return "DataNodeManager.gracefulShutdown"; } // This is really aimed at DataNode::getDebugInfo(). That function is better // because it can be overridden. This is available as the default case. std::string getDebugInfo(DataNode const *node) const; }; // This has to be public inheritance, or else // ‘static void* FixedMalloc::operator new(size_t)’ is inaccessible class BroadcastMessage : public DataNodeManager::EventQueueListener { private: const std::string _channel; DataNodeManager *_dataNodeManager; time_t _submitTime; void onEventQueue(); public: BroadcastMessage(std::string channel, SocketInfo *socket = NULL) : DataNodeManager::EventQueueListener(socket), _channel(channel), _dataNodeManager(NULL), _submitTime(time(NULL)) { } std::string getChannel() const { return _channel; } // Sending the message will add it into the appropriate queue. When its // time comes up, it will deliver itself to any and all appropriate // listeners. Then it will delete itself. This can be called from any // thread. void send(DataNodeManager *dataNodeManager); // This was originally aimed at the debug interface. It hasn't been used // in a long time, and doesn't make much sense in production. I'm removing // it to avoid any problems. //void setSubmitTime(time_t submitTime) { _submitTime = submitTime; } }; class DataNodeLink; class DataNodeArgument; class DataNode : NoCopy, NoAssign, protected DataNodeListener, protected BroadcastMessageListener { private: // The people we may have to notify. std::set< DataNodeLink * > _listeners; // Links to us which have no callbacks. These were seperated from _listeners //strictly as an optimization. int_fast32_t _otherOwners; std::string _byLink; DataNodeManager * const _manager; // This is a channel dedicated to us. No one else will use it. Use this // to send a message back to yourself from another thread. This replaces // FListenerId, although the mechanism is slightly different here than in // Delphi. const std::string _ownerChannel; // These are the broadcast channels we are currently listening to. We keep // track of these to prevent someone from subscribing twice (which would // break some data structures and to automatically remove all open // connections when we shut down. std::set< BroadcastRequest > _broadcastRequests; // One more listener. void addLink(DataNodeLink *link); // This is true if there are no listeners or other reasons to live. bool readyForRelease(); // We no longer need the Release procedure. That was required when there // were multiple threads. We sent the release request to the approprate // thread, and that procedure double checked to make sure we were ready // for release. // A placeholder so we match the interface. void onBroadcast(BroadcastMessage &message, int msgId) { } // This is a good default, appropriate for many simple data nodes. There's // no good reason to make this do nothing. If we didn't want to do anything, // we would not have requested a wakeup. You can own a data node without // listening to it. void onWakeup(int msgId) { notifyListeners(); } public: // This severs the connection to the given link. It should only be called // by DataNodeLink. If this is the last link, it will cause the data // node to release its resources. After calling this you should be hands off // because this object could be gone. void removeLink(DataNodeLink *link); protected: // This notifies all active listeners. This should only be called in the // data node thread. It is safe to call this method after this object has // been shut down (or while it is in the process of being shut down). In // that case, there are no listeners, so this is a no-op. The standard // shutdown FIRST removes all listeners, THEN removes any subordinate // data nodes. void notifyListeners(); // This makes the given data node a listener to the given channel. // Attempting to subscribe the same data node to the same channel more // than once is an error. However, it is acceptable to unsubscribe then // resubscribe. void registerForBroadcast(std::string channel, int msgId); // This removes the subscription. This should only be called from the data // node thread. This can safely be called even if there is no such // subscription. All subscriptions are automatically removed as part of the // standard shutdown. void unregisterForBroadcast(std::string channel, int msgId); // This is a unique string which will never be reused. Register for this // to broadcast a message to yourself. If you need more than one channel, // start with this, then add a . then add more. const std::string &getOwnerChannel() const { return _ownerChannel; } DataNodeManager *getManager() const { return _manager; } DataNode(); // This creates a new listener. It returns a data node link as the recipt. // The recipt can be used to terminate the connection to this data node. The // listener is called whenever the data in this data node changes, except // when the link is terminated. DataNodeLink *createLink(DataNodeListener *listener, int msgId); // This version will not request a callback, but will still maintain the // reference count. DataNodeLink *createLink() { return createLink(NULL, 0); } time_t getSubmitTime() const { return _manager->getSubmitTime(); } // In Delphi the bulk of the data nodes descended from either // TGenericDataNode or TDataNodeWithStringKey. This covers the later case. // A large number of data nodes really had exactly a single string as an // input. (Usually that was the stock symbol.) However, others had to // marshal more complicated arguments into and out from a string. That was // done in a haphazard, impromptu manner. This function makes it very easy // to pass a string as the key. But you can also use other types of data // node arguments. template< class T > static DataNodeLink *findHelper(DataNodeListener *listener, int msgId, T *&node, DataNodeArgument const &args); private: // Automatic links create simipler code. They are not strictly required, but // they help. All items in this list will be released as soon as the last // link to this data node is released std::vector< DataNodeLink * > _automaticLinks; protected: void addAutoLink(DataNodeLink *link) { _automaticLinks.push_back(link); } // Only DataNode::removeLink() should call this. I don't understand why // this had to be protected, not private. virtual ~DataNode(); public: typedef int64_t Integer; virtual std::string getDebugInfo() const; }; class DataNodeLink { private: friend class DataNode; DataNode *const _source; DataNodeListener *const _listener; const int _msgId; DataNodeLink(DataNode *source, DataNodeListener *listener, int msgId) : _source(source), _listener(listener), _msgId(msgId) { } void notifyListener(); void registerNode(std::string name, DataNode *dataNode); void unregisterNode(std::string name); public: void release(); }; // This allows a data node to listen for a DeleteSocketThread::callbackId // message. This is aimed at a very few low level data nodes which (a) get // their data from a socket and (b) need to know when that socket closes. // Note that this does not automatically grab that message. The data node // thread needed to listen for that message and then call // SocketClosedDataNode::doNotify(). This class was specifically added to // support the proxy data source, but this is very generic and it could be // used by other classes. class SocketClosedDataNode : public DataNode { private: static __thread SocketClosedDataNode *instance; SocketInfo *_socket; SocketClosedDataNode(DataNodeArgument const &args); friend class DataNode; ~SocketClosedDataNode(); public: SocketInfo *getSocket() const { return _socket; } static DataNodeLink *find(DataNodeListener *listener, int msgId, SocketClosedDataNode *&node); // This must be called from the correct thread. This will work whether // this data node exists or not. static void doNotify(SocketInfo *socket); }; // We used a variant, or similar, in Delphi to store the arguments in the // facotries. C++ doesn't have a variant, so bascially i'm building one // myself. // // In Delphi we had two major types of data nodes, those with a single // string as a key, and those with a factory as a key. Most of the time that // worked, but sometimes we abused the single string, and marshaled all types // of data into and out of that string. Now I plan to have only one set of // marshalling routines used in both cases. We will still have "generic" // data nodes which are spawned from factories, and other data nodes being // created by simpler interfaces, but under the hood they will both use the // same routines to make a string key, and they will all be stored in the same // table. class DataNodeArgument; class DataNodeArgumentBase { public: static std::string quoteString(std::string value); virtual std::string asString() const =0; // This fills in a placeholder with a value. It can be recursive. Objects // of this class are intended to be immutable. We create a new object, if // necessary. If nothing changes, we return NULL. We expect a higher level // to convert that null back into a pointer to us. Because this ia a smart // pointer, we can't just return "this". virtual DataNodeArgument replace(std::string const &name, DataNodeArgument const &value) const; // Do we need an accept all defaults command? I've always liked the idea of // a default, but I don't think i've ever used one. virtual ~DataNodeArgumentBase() { } }; class DataNodeArgumentInt : public DataNodeArgumentBase { private: const DataNode::Integer _value; public: DataNode::Integer getValue() const { return _value; } std::string asString() const; DataNodeArgumentInt(DataNode::Integer value) : _value(value) { } }; class DataNodeArgumentString : public DataNodeArgumentBase { private: const std::string _value; public: std::string const &getValue() const { return _value; } std::string asString() const; DataNodeArgumentString(std::string value) : _value(value) { } }; class DataNodeArgumentDouble : public DataNodeArgumentBase { private: const double _value; public: double getValue() const { return _value; } std::string asString() const; DataNodeArgumentDouble(double value) : _value(value) { } }; class DataNodeArgumentPlaceHolder : public DataNodeArgumentBase { private: const std::string _name; // Do we need a default? I like the idea, but i don't think i've ever // used it in Delphi. public: std::string asString() const; DataNodeArgument replace(std::string const &name, DataNodeArgument const &value) const; DataNodeArgumentPlaceHolder(std::string name) : _name(name) { } }; // This was called TParamList in Delphi. typedef std::vector< DataNodeArgument > DataNodeArgumentVector; // Presumably factory will be a subclass of this. class DataNodeArgumentList : public DataNodeArgumentBase { private: const DataNodeArgumentVector _value; public: std::string asString() const; DataNodeArgumentVector const &getValue() const { return _value; } DataNodeArgument replace(std::string const &name, DataNodeArgument const &value) const; DataNodeArgumentList(DataNodeArgumentVector const &items) : _value(items) { } }; class DataNodeArgument { private: // This class is primarily inteneded to be used as a key in a table. And we // expect a lot of sharing of these keys and peices of keys. So the // underlying object are all constant. However, DataNodeArgument can be // modified with the default assignment operator. This is required to // implement a DataNodeArgumentVector. Note that a DataNodeArgumentVector // can be modified in all the normal ways. However, DataNodeArgumentList // makes a *copy* of this vector, and that copy is constant. RefCount< DataNodeArgumentBase >_value; public: // These objects are immutable. This might return a new object. Or, if no // changes were necessary, it might return the original object. And it's // possible that parts of the original object will be reused, and other // parts will be replaced. DataNodeArgument replace(std::string const &name, DataNodeArgument const &value) const; DataNodeArgument(DataNode::Integer value); DataNodeArgument(std::string value); DataNodeArgument(char const *value); DataNodeArgument(double value); DataNodeArgument(DataNodeArgumentVector const &value); DataNodeArgument(); DataNodeArgument(DataNodeArgumentBase *value); // These are synonyms for DataNode::Integer. C++ probably would have figured // that out on it's own, but I didn't want to leave anything to chance. DataNodeArgument(bool value); DataNodeArgument(int value); // Note: This returns false for something created by the default // constructor, and true for anything else. It will return true for any // number, even 0 or 0.0. It will return true for any string, including // the empty string. This is completely different from getBooleanValue(). operator bool() const { return _value; } // Convert this into a string. If a and b are both of type DataNodeArgument // then a.asString() == b.asString() should be true if and only if a == b. // And a.asString() should never be a proper substring of b.asString(). // In principal you could convert back. We never really need to do that, // but sometimes the strings might be visible to a developer, and it would // be nice if he could read into the string some. std::string asString() const; // This will automatically throw an exception if the type is wrong. Note // that there is no attempt to do any automatic conversions. If you want // a double, but someone stored an integer, things will fail. The assumption // is that you know exactly what should be in here. You stored it in a // common format to help the base classes do their thing. But when you // go back to retrieve it, you know exactly what the format should be. RTTI // is used only as an assertion. template < class T > T const &getValue() const { return dynamic_cast< T const & >(*_value); } std::string const &getStringValue() const { return getValue< DataNodeArgumentString >().getValue(); } DataNodeArgumentVector const &getListValue() const { return getValue< DataNodeArgumentList >().getValue(); } DataNode::Integer getIntValue() const { return getValue< DataNodeArgumentInt >().getValue(); } double getDoubleValue() const { return getValue< DataNodeArgumentDouble >().getValue(); } bool getBooleanValue() const { return getIntValue(); } }; inline DataNodeArgument argList(DataNodeArgument const &a, DataNodeArgument const &b) { DataNodeArgumentVector list; list.push_back(a); list.push_back(b); return list; } inline DataNodeArgument argList(DataNodeArgument const &a, DataNodeArgument const &b, DataNodeArgument const &c) { DataNodeArgumentVector list; list.push_back(a); list.push_back(b); list.push_back(c); return list; } inline DataNodeArgument argList(DataNodeArgument const &a, DataNodeArgument const &b, DataNodeArgument const &c, DataNodeArgument const &d) { DataNodeArgumentVector list; list.push_back(a); list.push_back(b); list.push_back(c); list.push_back(d); return list; } inline DataNodeArgument argList(DataNodeArgument const &a, DataNodeArgument const &b, DataNodeArgument const &c, DataNodeArgument const &d, DataNodeArgument const &e) { DataNodeArgumentVector list; list.push_back(a); list.push_back(b); list.push_back(c); list.push_back(d); list.push_back(e); return list; } //#include template< class T > DataNodeLink *DataNode::findHelper(DataNodeListener *listener, int msgId, T *&node, DataNodeArgument const &args) { // THE PREFIX * IS RESERVED HERE. std::string name = "*"; name += DataNodeArgumentBase::quoteString(typeid(T).name()); name += args.asString(); DataNodeManager *const manager = DataNodeManager::getDefault(); if (DataNode *prev = manager->findDataNode(name)) { node = dynamic_cast< T * >(prev); // How could we get here with node == NULL? // The commented out code below has always shown that the object in // prev was exactly the object I was looking for, and it was always // the right type. So why would dynamic_cast() return NULL? // 1) Potentially two different types could have the same name. // Presumably that would cause other C++ problems. I've never // actually run into that problem. // 2) If a class inherits from DataNode but forgets to say "public", // that will cause a NULL here. // 3) A long time ago I had some problems that I could never completely // track down. Rearranging the order of the files in the build // solved it! Strange. //if (!node) //{ // std::cerr<registerNode(name, node); } return node->createLink(listener, msgId); } #endif