#include #include "../shared/LogFile.h" #include "LimitAlert.h" ///////////////////////////////////////////////////////////////////// // LimitAlertRequest ///////////////////////////////////////////////////////////////////// uint64_t LimitAlertRequest::_lastId = 0; uint64_t LimitAlertRequest::getNextId() { return __sync_add_and_fetch(&_lastId, 1); } LimitAlertRequest::LimitAlertRequest(SocketInfo *socketInfo, std::string const &symbol, double price, bool isLong, bool includeFormT) : Request(socketInfo), _id(getNextId()), _symbol(symbol), _price(price), _direction(isLong), _includeFormT(includeFormT) { } ///////////////////////////////////////////////////////////////////// // addLambdaToDataNodeQueue // // TODO This was copied into the DataNodes library. See the version of // DataNodeManager::addToQueue() which accepts a lambda. It would be // easy to replace this code with a call to that function. I'm just // not in the mood to test that change yet. But it would be good to have // fewer copies of almost identical code. See revision 1.21 of DataNodes.C // and revision 1.30 of DataNodes.h ///////////////////////////////////////////////////////////////////// template class DataNodeGenericWork : public DataNodeManager::EventQueueListener { private: Action const _action; public: DataNodeGenericWork(SocketInfo *socket, Action action) : DataNodeManager::EventQueueListener(socket), _action(action) { } virtual void onEventQueue() { _action(); } }; template < class Action > void addLambdaToDataNodeQueue(DataNodeManager *manager, Action action, SocketInfo *socket = NULL) { manager->addToQueue(new DataNodeGenericWork< Action >(socket, action)); } ///////////////////////////////////////////////////////////////////// // LimitAlertManager::SymbolWatcher ///////////////////////////////////////////////////////////////////// LimitAlertManager::SymbolWatcher::SymbolWatcher(LimitAlertManager *owner, std::string const &symbol, TradeDirection direction, bool includeFormT) : _owner(owner), _direction(direction), _includeFormT(includeFormT), _currentPrice(std::numeric_limits::quiet_NaN()) { _link = GenericTosDataNode::find(this, 0, _tosData, symbol); if (_tosData->getValid()) { if (includeFormT) _currentPrice = _tosData->getLast().price; else _currentPrice = _tosData->getLast().todaysClose; } } LimitAlertManager::SymbolWatcher::~SymbolWatcher() { _link->release(); } void LimitAlertManager::SymbolWatcher::insert(LimitAlertRequest *newRequest) { assert(newRequest->getDirection() == _direction); assert(newRequest->getIncludeFormT() == _includeFormT); _requests.insert(std::make_pair(newRequest->getPrice(), newRequest)); checkNow(); // checkNow() might delete the "this" pointer! } void LimitAlertManager::SymbolWatcher::erase(LimitAlertRequest *request) { _requests.erase(std::make_pair(request->getPrice(), request)); } void LimitAlertManager::SymbolWatcher::onWakeup(int msgId) { if (!_tosData->getValid()) return; if (_tosData->getLast().formT && !_includeFormT) return; const double newPrice = _tosData->getLast().price; const bool needToCheck = std::isnan(_currentPrice) || _direction.moreReportable(newPrice, _currentPrice); _currentPrice = newPrice; if (needToCheck) checkNow(); } std::pair< double, LimitAlertRequest * > LimitAlertManager::SymbolWatcher::getFirst() const { if (_direction.isLong()) return *_requests.begin(); else return *_requests.rbegin(); } void LimitAlertManager::SymbolWatcher::checkNow() { if (std::isnan(_currentPrice)) return; while (!_requests.empty()) { std::pair< double, LimitAlertRequest * > pair = getFirst(); double limitPrice = pair.first; LimitAlertRequest *request = pair.second; if (_direction.moreReportable(limitPrice, _currentPrice)) break; // Notice the order of the next two lines. _owner->erase() and // this->erase() will only remove the pointer from our data structures. // They will read information from the request, but they will not modify // it. request->onFired() will make changes. It will probably send the // request to another thread where it will quickly be deleted. If we // swapped these two lines there would be a race condition and we might // try to read from the object after it was deleted. _owner->erase(request); request->onFired(); } } std::string LimitAlertManager::SymbolWatcher::debugDumpCategory() const { TclList result; result<<_direction; if (_includeFormT) result<<"form T"; else result<<"normal"; return result; } std::string LimitAlertManager::SymbolWatcher::debugDump() const { TclList result; result<<"_currentPrice"<first); SocketInfo *const socket = it->second->getSocketInfo(); request<getSocket(); while (true) { const auto it = _bySocket.lower_bound(std::make_pair(socket, (LimitAlertRequest *)NULL)); if (it == _bySocket.end()) break; if (it->first != socket) break; LimitAlertRequest *const request = it->second; erase(request); delete request; } } std::string LimitAlertManager::debugDump() const { TclList result; for (int i = 0; i < 2; i++) for (int j = 0; j < 2; j++) { std::map< std::string, SymbolWatcher * > const &watchers = _watchers[i][j]; if (!watchers.empty()) { TclList forCategory; forCategory<second->debugDumpCategory(); for (auto it = watchers.begin(); it != watchers.end(); it++) { TclList forWatcher; forWatcher<first; // Symbol forWatcher<second->debugDump(); // Watcher forCategory<addToQueue(new Submit(this, newRequest)); } void LimitAlertManager::cancel(uint64_t id) { addLambdaToDataNodeQueue(_manager, [=] { cancelImpl(id); }); } void LimitAlertManager::submitImpl(LimitAlertRequest *newRequest) { /* TclList msg; msg<getId() <<"symbol"<getSymbol() <<"price"<getPrice() <<"direction"<getDirection() <<"include form T"<<(newRequest->getIncludeFormT()?"yes":"no"); LogFile::primary().sendString(msg, newRequest->getSocketInfo()); */ insert(newRequest); } void LimitAlertManager::cancelImpl(uint64_t id) { LimitAlertRequest *request = erase(id); /* TclList msg; msg<getSymbol(), request->getDirection(), request->getIncludeFormT()); } void LimitAlertManager::removeWatcherSoon(LimitAlertRequest *request) { const std::string symbol = request->getSymbol(); const bool isLong = request->getDirection().isLong(); const bool includeFormT = request->getIncludeFormT(); addLambdaToDataNodeQueue(_manager, [=] { auto &map = _watchers[isLong][includeFormT]; // Check that the watcher (a) hasn't already been deleted and (b) is // still empty. Things might have changed since we queued up this // request. if (SymbolWatcher *const watcher = getPropertyDefault(map, symbol)) { if (watcher->empty()) { delete watcher; map.erase(symbol); } } }); } LimitAlertManager *LimitAlertManager::primary = NULL;