#include "../../shared/ReplyToClient.h" #include "../../shared/GlobalConfigFile.h" #include "../data_framework/GenericL1Data.h" #include "../data_framework/GenericTosData.h" #include "../data_framework/StandardCandles.h" #include "../ProxyData.h" #include "../misc_framework/TimerDataNode.h" #include "../NxCoreKafkaConsumer.h" #include "TestAlerts.h" #include "ArthurIlyayev.h" #include "VitoGarfi.h" #include "WestonAlerts.h" #include "KoyoteAlerts.h" #include "MiscAlerts.h" #include "ShaneBird.h" #include "SmbRadar.h" #include "JoeFavaloro.h" #include "BigBear.h" #include "ChrisPavelic.h" #include "UserInfo.h" #include "DataNodeThread.h" DataNodeThread::DataNodeThread() : ThreadClass("DataNodeThread"), _incoming(getName()), _dataNodeManager(&_incoming, mtDataNodeEvent), _westonDatabaseMonitor(&_dataNodeManager) { initProxyData(&_dataNodeManager); initKafkaConsumer(&_dataNodeManager); startThread(); CommandDispatcher::getInstance() ->listenForCommand("tcl", &_incoming, mtTclCommand); } DataNodeThread::~DataNodeThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void DataNodeThread::initAlerts() { Request *r = new Request(NULL); r->callbackId = mtInitAlerts; _incoming.newRequest(r); } void DataNodeThread::threadFunction() { _dataNodeManager.setAsDefault(); GenericL1DataNode::registerDebugger(&_dataNodeManager); GenericTosDataNode::registerDebugger(&_dataNodeManager); StandardCandles::registerDebugger(&_dataNodeManager); DataNodeLink *timerLink = TimerDataNode::find(this, 0); // Prove that we can connect! This is where we often hang. DataNodeLink *connectTestLink; { GenericTosDataNode *dataNode; connectTestLink = GenericTosDataNode::find(dataNode, "$TIME"); } // Another test. This sends periodic alerts to the client. initTestAlerts(); const int tclThreadCount = strtolDefault(getConfigItem("tcl_thread_count"), 1); for (int i = 0; i < tclThreadCount; i++) { TclThread *t = new TclThread; t->executeScript("set tiServer::info(thread_count) " + ntoa(tclThreadCount), "thread_count"); t->executeScript("set tiServer::info(thread_id) " + ntoa(i), "thread_id"); _tclThreads.push_back(t); } while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtTclCommand: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); SocketInfo *const s = request->getSocketInfo(); if (!UserInfoThread::getInstance().hasAdminPermission(s)) { addToOutputQueue(s, "FAIL", request->getResponseMessageId()); } else { std::string const &script = request->getProperty("script"); const ExternalRequest::MessageId msgId = request->getResponseMessageId(); const int thread = strtolDefault(request->getProperty("thread"), -1); if (thread < 0) for (std::vector< TclThread * >::iterator it = _tclThreads.begin(); it != _tclThreads.end(); it++) (*it)->externalScript(script, s, msgId); else if (thread < (int)_tclThreads.size()) _tclThreads[thread]->externalScript(script, s, msgId); } break; } case mtDataNodeEvent: { DataNodeManager::EventQueueListener *request = dynamic_cast< DataNodeManager::EventQueueListener * > (current); request->onEventQueue(); break; } case mtInitAlerts: { initArthurIlyayevAlerts(); initVitoGarfiAlerts(); initWestinAlerts(); initKoyoteAlerts(); initMiscAlerts(); initShaneBird(); //initSmbRadar(); initJoeFavaloro(); initBigBear(); initChrisPavelic(); break; } case mtQuit: { timerLink->release(); connectTestLink->release(); for (std::vector< TclThread * >::iterator it = _tclThreads.begin(); it != _tclThreads.end(); it++) { delete *it; } _tclThreads.clear(); delete current; return; } case DeleteSocketThread::callbackId: { SocketClosedDataNode::doNotify(current->getSocketInfo()); break; } } delete current; } _incoming.waitForRequest(); } } void DataNodeThread::onWakeup(int msgId) { // This is from the dead man timer. _westonDatabaseMonitor.updateDeadManTimer(); }