#include "../shared/GlobalConfigFile.h" #include "../generate_alerts/data_framework/GenericTosData.h" #include "../generate_alerts/ProxyData.h" #include "../generate_alerts/NxCoreKafkaConsumer.h" #include "CandleDataNode.h" #include "DataNodeThread.h" DataNodeThread::DataNodeThread() : ThreadClass("DataNodeThread"), _incoming(getName()), _dataNodeManager(&_incoming, mtDataNodeEvent) { startThread(); _initialized.waitForever(); } DataNodeThread::~DataNodeThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void DataNodeThread::threadFunction() { _dataNodeManager.setAsDefault(); //GenericL1DataNode::registerDebugger(&_dataNodeManager); GenericTosDataNode::registerDebugger(&_dataNodeManager); typedef std::vector< DataNodeLink * > Links; Links links; // These items need to be done before the constructor for this object // finishes. initProxyDataInThread(); if ("1" != getConfigItem("kafka_disable")) { initKafkaConsumer(&_dataNodeManager); } CandleDataNode::initialize(_dataNodeManager); links.push_back(SynchronizedTimer::find(this, 0, _synchronizedTimer)); _initialized.set(); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtDataNodeEvent: { DataNodeManager::EventQueueListener *request = dynamic_cast< DataNodeManager::EventQueueListener * > (current); request->onEventQueue(); break; } case mtRequestTimer: { InitializeTimeRequest *request = dynamic_cast< InitializeTimeRequest* >(current); _timerCallbacks.push_back(request->timerCallback); break; } case mtDoInThread: { DoInThreadRequest *request = dynamic_cast< DoInThreadRequest * >(current); if (request->inDataNodeThread()) current = NULL; break; } case mtQuit: { delete current; for (Links::iterator it = links.begin(); it != links.end(); it++) (*it)->release(); return; } case DeleteSocketThread::callbackId: { SocketClosedDataNode::doNotify(current->getSocketInfo()); break; } } delete current; } _incoming.waitForRequest(); } } void DataNodeThread::onWakeup(int msgId) { if (_synchronizedTimer->getTimePhase() != BarCounter::tpNotify) // The data node will always go off twice in a row. That's helpful for // the old standard candles, but it's meaningless for us. We only report // once each new candle. return; for (TimerCallbacks::iterator it = _timerCallbacks.begin(); it != _timerCallbacks.end(); it++) { TimeRequest *timeRequest = new TimeRequest(_synchronizedTimer->getStartTime()); timeRequest->callbackId = it->callbackId; it->requestListener->newRequest(timeRequest); } } void DataNodeThread::requestTimerUpdates(RequestListener *requestListener, int callbackId) { InitializeTimeRequest *request = new InitializeTimeRequest; request->timerCallback.requestListener = requestListener; request->timerCallback.callbackId = callbackId; _incoming.newRequest(request); } void DataNodeThread::submit(DoInThreadRequest *request) { Request *const r = dynamic_cast< Request * >(request); r->callbackId = mtDoInThread; _incoming.newRequest(r); }