#include #include "../../shared/LogFile.h" #include "../../shared/GlobalConfigFile.h" #include "../../shared/ReplyToClient.h" #include "../data_framework/SimpleMarketData.h" #include "../data_framework/GenericL1Data.h" #include "../data_framework/GenericTosData.h" #include "../data_framework/GenericHaltData.h" #include "../data_framework/StandardCandles.h" #include "../../shared/MarketHours.h" #include "../misc_framework/CsvFileDataNodes.h" #include "../misc_framework/TimerDataNode.h" #include "../ProxyData.h" #include "../NxCoreKafkaConsumer.h" #include "../nasdaq_vf/StockTwitsDataNode.h" #include "AlertMainControl.h" //////////////////////////////////////////////////////////////////// // TestDataNode // // This is a rudimentary test to see if the market data is working. // We try to connect and request data for one symbol. It doesn't // sound very interesting, but this fails more often than it should. // I'd rather have this happen at a time of my chosing, rather than // when we first start the alerts at or around 4:20 in the morning. //////////////////////////////////////////////////////////////////// class TestDataNode : public DataNode { TestDataNode(DataNodeArgument const &args); friend class DataNode; public: static DataNodeLink *find(); }; DataNodeLink *TestDataNode::find() { TestDataNode *unused; return findHelper(NULL, 0, unused, DataNodeArgument()); } TestDataNode::TestDataNode(DataNodeArgument const &args) { static const std::string SYMBOL = "$TIME"; GenericTosDataNode *tosData; addAutoLink(GenericTosDataNode::find(tosData, SYMBOL)); } //////////////////////////////////////////////////////////////////// // NormalStartMessage //////////////////////////////////////////////////////////////////// class NormalStartMessage : public Request { private: const std::string _configFileName; const std::string _symbolListFileName; public: NormalStartMessage(std::string const &configFileName, std::string const &symbolListFileName) : Request(NULL), _configFileName(configFileName), _symbolListFileName(symbolListFileName) { } std::string const &getConfigFileName() const { return _configFileName; } std::string const &getSymbolListFileName() const { return _symbolListFileName; } }; //////////////////////////////////////////////////////////////////// // AlertMainControl //////////////////////////////////////////////////////////////////// // TODO this should probably go in MiscSupport.[Ch] static std::vector< std::string > loadSymbolList(std::string const &fileName) { std::vector< std::string > result; std::ifstream stream(fileName.c_str(), std::ios_base::in); // TODO check for errors and report them to the log. while(stream) { std::string symbol; std::getline(stream, symbol); symbol = strtoupper(trim(symbol, " \r")); if (!symbol.empty()) result.push_back(symbol); } // TODO if there was an error part way through (not counting end of file as // an error) we should probably clear the result in addition to reporting to // the log. That's consistent with the rest of our code. return result; } AlertMainControl::AlertMainControl() : ThreadClass("AlertMainControl"), _incoming(getName()), _pushAlertsThread("_pushAlertsThread"), _dataNodeManager(&_incoming, mtDataNodeEvent) { initProxyData(&_dataNodeManager); initKafkaConsumer(&_dataNodeManager); startThread(); } AlertMatrix *AlertMainControl::findMatrix(std::string const &name) { AlertMatrix *&matrix = _matrix[name]; if (!matrix) matrix = new AlertMatrix(&_pushAlertsThread); return matrix; } AlertMainControl::~AlertMainControl() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void AlertMainControl::normalStart(std::string const &configFileName, std::string const &symbolListFileName) { Request *r = new NormalStartMessage(configFileName, symbolListFileName); r->callbackId = mtNormalStart; _incoming.newRequest(r); } void AlertMainControl::startTest() { Request *r = new Request(NULL); r->callbackId = mtStartTest; _incoming.newRequest(r); } void AlertMainControl::loadAlertsTypes(AlertMatrix *matrix, std::string const &requirements) { for (TwoDArray::StringList::const_iterator it = _config.getRowHeaders().begin(); it != _config.getRowHeaders().end(); it++) { if (_config.get("Requirements", *it) == requirements) { const std::string factoryName = _config.get("Factory Name", *it); AlertCategory category; category.alertType = *it; category.factory = GenericDataNodeFactory::findFactory(factoryName, true); if (category.factory) matrix->add(category); else { // findFactory() is not capable of printing a good error message. TclList msg; msg<<"AlertMainControl.C" <<"AlertMainControl::loadAlertsTypes()" <<"Can't find factory by name." <<*it <callbackId = mtPreload; _incoming.newRequest(r); } } void AlertMainControl::doNextPreLoadStep() { const std::string symbol = _toPreload.back(); _toPreload.pop_back(); GenericTosDataNode *tosData; GenericTosDataNode::find(tosData, symbol); GenericL1DataNode *l1Data; GenericL1DataNode::find(l1Data, symbol); GenericHaltDataNode *haltData; GenericHaltDataNode::find(haltData, symbol); char const *byName[] = { "PutVolume", "CallVolume", "LimitUp", "LimitDown" }; for (char const *name : byName) { DataNodeArgument factory = GenericDataNodeFactory::findFactory(name); factory = factory.replace(symbolPlaceholder, symbol); GenericDataNode *genericData; factory.getValue< GenericDataNodeFactory >().find(genericData); } // TODO are we concerned with the regional data? NyseBid and NyseAsk scheduleNextPreLoadStep(); } std::string AlertMainControl::defaultSymbolListFile() { return FileOwnerDataNode::findDataFile(getConfigItem("symbol_list_file_name", "Symbols.txt")); } void AlertMainControl::normalStartImpl(std::string configFileName, std::string symbolListFileName) { if (configFileName.empty()) { configFileName = getConfigItem("alert_config_file", "AlertList.csv"); configFileName = FileOwnerDataNode::findDataFile(configFileName); } if (symbolListFileName.empty()) symbolListFileName = defaultSymbolListFile(); TclList msg; msg<<"AlertMainControl.C" <<"normalStartImpl" <<"configFileName" <empty() && lowVolume->empty() && volumeConfirmed->empty() && topList->empty())) { // This case is not precisely defined. msg.clear(); msg<<"AlertMainControl.C" <<"normalStartImpl" <<"Aborted!" <<"Already running."; LogFile::primary().sendString(msg); return; } _config.loadFromCSV(configFileName); loadAlertsTypes(misc, "Misc"); loadAlertsTypes(lowVolume, "LowVolumeOnly"); loadAlertsTypes(volumeConfirmed, "VolumeConfirmed"); { AlertCategory topListCategory; topListCategory.factory = GenericDataNodeFactory::findFactory("FastHeartbeat"); topList->add(topListCategory); } int symbolCount = 0; for (std::string const &symbol : loadSymbolList(symbolListFileName)) { TclList msg; symbolCount++; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"adding"<add(symbol); topList->add(symbol); if (!symbolIsIndex(symbol)) { const DataNode::Integer volume = getAverageDailyVolume(symbol); if (volume >= 150000) volumeConfirmed->add(symbol); if (volume <= 3000000) lowVolume->add(symbol); } } msg.clear(); msg<<"AlertMainControl.C" <<"normalStartImpl" <<"success" <callbackId) { case mtDataNodeEvent: { DataNodeManager::EventQueueListener *request = dynamic_cast< DataNodeManager::EventQueueListener * > (current); request->onEventQueue(); break; } case mtStartTest: { if (testDataNodeLink) { // Duplicate request. Silently ignore it. } else { initializePreLoad(); testDataNodeLink = TestDataNode::find(); // If there's a problem, we typically would not get to // here. The program would hang or, more often, fail an // asertion. TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"test data started"; LogFile::primary().sendString(msg); } break; } case mtPreload: { doNextPreLoadStep(); break; } case mtNormalStart: { NormalStartMessage *request = dynamic_cast< NormalStartMessage * >(current); normalStartImpl(request->getConfigFileName(), request->getSymbolListFileName()); break; } case mtQuit: { timerLink->release(); if (testDataNodeLink) testDataNodeLink->release(); for (std::map< std::string, AlertMatrix * >::iterator it = _matrix.begin(); it != _matrix.end(); it++) { delete it->second; } _matrix.clear(); delete current; return; } case DeleteSocketThread::callbackId: { SocketClosedDataNode::doNotify(current->getSocketInfo()); break; } } delete current; } _incoming.waitForRequest(); } } void AlertMainControl::onWakeup(int msgId) { // This is from the dead man timer. _pushAlertsThread.updateDeadManTimer(); }