#ifndef __CandleDataNode_h_ #define __CandleDataNode_h_ #include #include "../shared/Locker.h" #include "../shared/ThreadSafeRefCount.h" #include "../shared/ContainerThread.h" #include "../generate_alerts/misc_framework/DataNodes.h" #include "EpochCounter.h" #include "SingleCandle.h" /* This is a common interface used to by both daily and intraday candles. * * The underlying implementation has to be different for these. The database * part is similar, but the realtime part is completely different. (For one * thing, we have to build all one minute candles ourselves based on the * individual prints. But the datafeed constantly tells us the right values * for today's open, high low and close. Not only would it be more work to try * to build the daily candle like a one minute candle, it would probably be * less accurate. The data feed knows about all sorts of corrections and other * special conditions.) * * This interface is implemented by OneMinuteCandles.h and DailyCandles.h. * * These are not really data nodes. They used to be, but the DataNode class * is awkward at best when providing data to multiple threads. The name * "CandleDataNode" is vestigial but I kept it because that made things easier * in CVS. * * There are no callbacks. At one time we implemented a callback each time * the data changed just because it was easy, but no one used it. * * You can easily find or create these objects in any thread. And you can * decrement the reference count in any thread. We still have to send * certain requests to other threads, but that's an implementation detail * and you don't have to care if you are using one of these classes. All * public member functions are thread safe. * * In addition to the public interface, there's a lot hiding in this class. * We've got some support similar to what the DataNode class provides. Of * course we implement find() in decrementReferenceCount(). But also we * have a lot of code to help OneMinuteCandles, DailyCandles, and any * furture classes access various DataNode objects. These are still the * source of the data. Many of these protected functions have the exact * same name as their DataNode counterparts. This made it easier to * convert OneMinuteCandles and DailyCandles which used to descend from * DataNode. * * Also look at the GridDataProvider interface in GridReaderBase.h. That * is the most common user of the CandleDataNode class. */ class CandleDataNode : protected DataNodeListener, protected BroadcastMessageListener { private: const std::string _key; // Only accessed within the Manager's mutex. int _referenceCount; // Only access these in the data node thread. std::set< BroadcastRequest > _broadcastRequests; std::vector< DataNodeLink * > _automaticLinks; protected: CandleDataNode(std::string const &key) : _key(key), _referenceCount(0) { } // Use a string to describe each item. Unlike DataNodeArgument, we never // try to parse this. However, it is usually human readable, at least for // a developer. template < class T, class... Args > static std::string makeKey(Args... args) { TclList result; result< > _nodes; IContainerThread &_thread; Manager(DataNodeManager &dataNodeManager) : _dataNodeManager(dataNodeManager), _thread(*IContainerThread::create("CandleDataNode::Manager")) { } public: // Thread safe. This is like findHelper() in the DataNode world. // // Look for an item with this key. If one is found, bump its reference // count and return it. // // If no object is found, call the create() command with no arguments. // Bump the reference count on that object to 1, store it in the table // for next time, and return it. // // Note that create() is called within a global lock. It should be // quick and simple. Note that this can be called in any thread. // // The object returned by create() and find() should be usable // immediately. Of course, it might not have any data yet. That's // always possible, even an hour or day after we started because we // are relying on other servers. But we should be ready to handle // a request immediately even if we are planning to do additional // initialization in a different thread. template< class Result = CandleDataNode, class Action > Result *find(std::string const &key, Action const &create) { auto nodes = _nodes.lock(); CandleDataNode *&result = (*nodes)[key]; if (!result) result = create(); result->_referenceCount++; assert(result->_key == key); return dynamic_cast< Result * >(result); } DataNodeManager &getDataNodeManager() const { return _dataNodeManager; } IContainerThread &getThread() { return _thread; } void decrementReferenceCount(CandleDataNode *candleDataNode); // For simplicity we have just one Manager. // // In principal you can have multiple DataNodeManager classes and // multiple DataNode threads. No one really does that. static Manager &getInstance() { return *_instance; } static void initialize(DataNodeManager &dataNodeManager); }; Manager &getManager() { return Manager::getInstance(); } void addAutoLink(DataNodeLink *link); void registerForBroadcast(std::string const &channel, int msgId); typedef int64_t Integer; // We keep an ordered list of candles and times. You can use a simple // binary search (e.g. std::lower_bound()) to find what you want. We use // the reference count to easily share the read only data between threads. // When we update the cache, we make a new one of these, rather than trying // to modify the existing one. typedef TSRefCount< std::vector< SingleCandleWithTime > > CachedCandles; // If you want to use a local cache, create one of these objects. // It is completely thread safe. class CachedCandlesHolder : NoCopy, NoAssign { private: // We never return a null pointer. If the cache is empty you get an // empty list of candles. That includes the default state, before we // load any data, so it might be common. static const CachedCandles EMPTY_LIST; // This is the part of the data that is protected by a mutex. struct NeedsLock { // shutDown is false when this object is working. When you call // ~CachedCandlesHolder() we set shutDown to true. This helps // the worker thread know that it can abandon any outstanding jobs // for this object. bool shutDown; // This makes sure we never ask for the same data twice. If you // ask for data, and the cache is empty, we'll send a request to // another thread to fill the cache. But if you ask several more // times in a row, nothing else will happen. It's common for a // client to make several requests at the same time for similar // data, e.g. 1 minute, 5 minute, and 10 minute candles all for // the same stock. This will be cleared by the other thread, // after it updates the data and the associated expiration date. bool newDataRequestPending; // This is how we know that the data is valid. time_t cacheExpires; // This is the data. When someone asks for data, we always return // what is in here. This might be null. get() checks for NULL // and will return EMPTY_LIST instead. CachedCandles candles; NeedsLock(); ~NeedsLock(); }; // This includes almost all data in the CachedCandlesHolder object. // Making this seperate helps us with some details of threading. You // can call ~CachedCandlesHolder() in almost any thread. It will // return immediately. There may be some worker threads still // accessing the Body, and we want to delete it later, after we're // sure everyone is done with it, probably in a different thread. struct Body { // getData is a function we can call to fill the cache. This will // be different for one minute vs daily candles. It will probably // be called in a worker thread and might be called after the call // to ~CachedCandlesHolder(). We try to minimize that, for // performance reasons, but we can't completely avoid that. So // be careful, if this is a lambda don't capture anything by // reference because that thing will stick around forever. std::function< CachedCandles() > getData; Locker< NeedsLock > needsLock; Body(std::function< CachedCandles() > gd) : getData(gd) { } }; // This is null if the cache is disabled. In that case all calls // to get() will return EMPTY_LIST. Body *_body; void clearIfOld(); public: // Enabled should be false if you don't want any caching. In that // case all calls to get() will return EMPTY_LIST. // getData is a function which should load fresh data to be stored // in the cache. CachedCandlesHolder(bool enabled, std::function< CachedCandles() > const &getData); ~CachedCandlesHolder(); // Thread safe. Always returns quickly. Might return EMPTY_LIST; // even if there are no problems, we might not have loaded the // cache yet. Never returns null. CachedCandles get(); std::string debugDump() const; }; public: // Generate a candle for the given time period. This might start by grabbing // multiple candles from the data store and joining them in the natural way. // Return an empty candle if there is no data in that time period. // // start and end should both be on exact boundaries. If they are not, some // type of rounding will occur. Intraday candles always start and end on // exact one minute boundaries. Daily candles always start at the open and // end at the close. // // CandleDataNode offers a default version of this function which will call // the next version of threadSafeGet(). virtual SingleCandle threadSafeGet(time_t start, time_t end); // This is similar to the previous version of threadSafeGet(). By making // several requests at once, we can often be more efficient. virtual void threadSafeGet(AllRowTimes const &requests, SingleCandle::ByStartTime &destination) =0; // The last time we updated a consumer, epoch was current. We want to update // the consumer again. How much old data should we throw out and recompute? virtual time_t threadSafeRestartAt(EpochCounter::Epoch epoch) =0; // Any request for data which starts at or after this date and time // will be satisfied completely in memory, no database access required. // This is just for optimizations. The data works either way. And because // of caching and the like, this value can change at any moment without // notice. virtual time_t oldestFastTime() =0; // This is a convenient and efficient container for use with the // startTimeList() function. This is set up to be used in a ranged // for loop. If you iterate using begin() and end() you will start // with the most recent time (the largest integer) and will end with // the oldest time. class StartTimeList { private: // A smart pointer to a list of start times. When you iterate over the // StartTimeList, you start by iterating over this list, in order, // unmodified. This might be null. TSRefCount< std::vector< time_t > > _firstContainer; // Effectively _firstContainer->begin(). We use _first a lot and we // don't want to think about the smart pointer every time. std::vector< time_t >::const_iterator _first; // The number of items in _firstContainer. (*_firstContainer)[n] is // valid if ((n >=0) && (n < _firstEnd)). int _firstEnd; // A smart pointer to a list of data. We are reusing CachedCandles // because there's probably a copy around already. And it was specifically // designed to be the right size for most requests. CachedCandles includes // the value of each candle, but we ignore that and only look at the start // time. CachedCandles is sorted with the smallest values first, and we // reverse that order on the fly as people request the data. CachedCandles _secondContainer; // After iterating over the entire first container, we start with // *_second. The next item we report is *(_second-1). We continue this // way until we report the first item in _secondContainer. _second might // not point to the last item in _secondContainer. Sometimes we skip the // last part of _secondContainer because that part of the data is a // duplicate of something that was already reported. std::vector< SingleCandleWithTime >::const_iterator _second; // _end is the number of items in this StartTimeList. get(n) is valid if // ((n >= 0) && (n < _end)). int _end; // Typically this is the oldest (i.e. smallest and last) item in this // StartTimeList. Sometimes we block out additional space. The next // request should start before (i.e. not including) this time. Set this // to 0 we are done and there should be no more requests. time_t _oldest; public: // times can be NULL. That would be a quick way to say we have no more // data. Otherwise times should be sorted with the largest/latest value // in index 0. StartTimeList(TSRefCount< std::vector< time_t > > const × = NULL); // realTime cannot be NULL or empty. The values in realTime should be // sorted with the largest/latest value in index 0. This is the first // value will we give to a user when he iterates over this object. // cached may be NULL or empty. If realTime and cached overlap at all, // we will skip that part of cached. StartTimeList(TSRefCount< std::vector< time_t > > const &realTime, CachedCandles const &cached); // times can be NULL. That is treated the same as an empy list. // startBefore tells us how much to block out. StartTimeList(time_t startBefore, CachedCandles const ×); // Access the data like an array. 0 is the first item. This will be // the largest number and the latest time in the list. size() - 1 is // the largest valid index and 0 is the smallest. time_t get(int index) const { assert((index >= 0) && (index < _end)); if (index < _firstEnd) return *(_first+index); else return (_second - (index - _firstEnd))->startTime; } // This provides an easy way to iterate over a StartTimeList. Just say // for (time_t time : myStartTimeList) { ... }. class Iterator { private: StartTimeList const *_data; int _index; public: Iterator(StartTimeList const *data, int index = 0) : _data(data), _index(index) { } Iterator &operator++() { _index++; return *this; } Iterator &operator--() { _index--; return *this; } bool operator ==(Iterator const &other) const { return (_data == other._data) && (_index == other._index); } bool operator !=(Iterator const &other) const { return !((*this) == other); } time_t operator *() const { return _data->get(_index); } }; Iterator begin() { return Iterator(this, 0); } Iterator end() { return Iterator(this, _end); } // If we need more data, and we want to start with this StartTimeList // ended, start from here. Do NOT include this value in the next list. time_t restartBefore() const { return _oldest; } // If this is true it's reasonable to ask for more data. Call // CandleDataNode::startTimeList() with this StartTimeList to get the // next StartTimeList. It is acceptable to ask for more data even when, // moreData() is false. But at some point you need to end your loop. bool moreData() const { return _oldest > 0; } // The number of items in this StartTimeList. int size() const { return _end; } // This is a nice end condition for a for loop. Stop as soon as this is // false. This returns true if there's data in this object, or there's a // reason to believe that there will be data in a future object. bool goodToGo() const { return _end || moreData(); } // After this goodToGo() will return false. void clear(); }; // This gives you access to the times of the individual candles that we // store. (E.g. minutes or days.) Most of the time you access a // CandleDataNode by asking for candles within a particular range. (E.g. // give me the candle starting at 9:45 am today and ending before 10:00.) // However, sometimes you want to see what data is available. In particular, // you need a certain amount of real data, no missing candles, you can start // here to see what data is available. // // The times are all listed from the most recent (the largest integer value) // to the oldest (the smallest integer value) with no duplicates. // // Each call to startTimeList() will give you one block of data. You // probably want to have an outer loop that calls startTimeList() repeatedly // to load new blocks of data, and an inner loop iterating over the data // in each StartTimeList. // // These blocks are are implementation detail. Typically the first call // will give you all data that we already have in memory. Each call after // that will grab an arbitrary amount of data from the database. virtual StartTimeList startTimeList(time_t startBefore) =0; virtual StartTimeList startTimeList(StartTimeList const &previous) =0; // Used by OneMinuteCandles when one server requests data from another. // Not used or implemented by DailyCandles. virtual std::string marshalTodaysCandles() { return ""; } virtual TclList debugDump() const; // Call this any time. But call this only once for each call to find(). // Don't access this object at all after this call. The object might be // freed by this call. Likely it would depend on a lot of things, including // other parts of the code which might or might not be sharing this, and // another thread which might be used to finish the cleanup. I.e. chaotic // and plenty of room for a race condition. Be careful! void decrementReferenceCount(); static CandleDataNode *find(std::string const &symbol, bool intraday); static void initialize(DataNodeManager &dataNodeManager) { Manager::initialize(dataNodeManager); } }; #endif