#include "../shared/ThreadClass.h" #include "../shared/GlobalConfigFile.h" #include "../shared/SimpleLogFile.h" #include "../shared/SelectableRequestQueue.h" #include "../shared/CommandDispatcher.h" #include "../shared/PollSet.h" #include "../shared/ThreadMonitor.h" #include "../shared/DatabaseWithRetry.h" #include "../shared/ReplyToClient.h" #include "../shared/TalkWithServer.h" #include "FieldLists.h" #include "AlertRecordMultiCast.h" #include "RecordDispatcher.h" ///////////////////////////////////////////////////////////////////// // RecordDispatcher ///////////////////////////////////////////////////////////////////// // Note: this is an old style of creating threads and talking with other // servers. New code should consider using ../shared/ContainerThread.h rather // than ../shared/ThreadClass.h and using ../shared/ServerConnection64.h // rather than ../shared/TalkWithServer.h. class RecordDispatcher : public IRecordDispatcher, ThreadClass, TalkWithServer::IMessageListener { private: enum { mtNewListener, mtExternalRecord, mtSendFakeRecord, mtDoIt }; class DoIt : public Request { public: typedef std::function < void() > Callback; const Callback callback; DoIt(Callback callback, SocketInfo *socket = NULL) : Request(socket), callback(callback) { callbackId = mtDoIt; } }; enum { ciData, ciStatus, ciFirstPing }; // Client id for messages from server. SelectableRequestQueue *_incoming; const std::string _base; const std::string _notifyString; const std::string _notifyStatusString; struct Listener { RequestListener *listener; int callbackId; }; std::vector< Listener > _listeners; std::vector< Listener > _statusListeners; class NewListener : public Request { public: Listener listener; const bool status; NewListener(bool status) : Request(NULL), status(status) { callbackId = mtNewListener; } }; LateRecordCounter _lateRecordCounter; //A new record arrived. Share it with all listeners. void notifyListeners(Record::Ref const &record) { ThreadMonitor::find().increment(_notifyString); _lateRecordCounter.add(record); for (std::vector< Listener >::const_iterator it = _listeners.begin(); it != _listeners.end(); it++) { NewRecord *request = new NewRecord(record); request->callbackId = it->callbackId; it->listener->newRequest(request); } } void notifyStatusListeners(std::string const &name) { ThreadMonitor::find().increment(_notifyStatusString); for (std::vector< Listener >::const_iterator it = _statusListeners.begin(); it != _statusListeners.end(); it++) { ProviderStatus *request = new ProviderStatus(name); request->callbackId = it->callbackId; it->listener->newRequest(request); } } bool _active; bool _thisIsMaster; bool _tcpSlave; // We are using TalkWithServer to read from the master. TalkWithServer *_talkWithServer; std::string _masterName; std::string _masterPort; int64_t _nextId; // If we are serving as the master, we need to assign id numbers. Usually // get get the id numbers from _nextId. However, when we first start we // need to know where to start from. In that case we go back to the // database. // // If you restart off hours this works perfectly. If you restart midday // it's possible that you will send alerts to other alert servers before // the same alert gets to the database, so we might reuse a small number of // id's. I explicitly do NOT want to hold up alerts until the database // acknowledges them. If we ever needed to be more certain about not // repeating an id, I'd use a flat file or something, where I had more // speed and control. void initNextId() { DatabaseWithRetry database(true, getName()); // Read only. _nextId = database.tryQueryUntilSuccess("SELECT MAX(id) FROM " + _base) ->getIntegerField(0,0) + 1; TclList msg; msg< command=alerts_send_fake_record&message_id=2 == MESSAGE 0 ========== 57 == {0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786418} command=alerts_send_fake_record&message_id=2&count=12 == MESSAGE 0 ========== 57 == {0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786454} command=alerts_send_fake_record&message_id=2&count=1299 == MESSAGE 0 ========== 57 == {0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786463} command=alerts_send_fake_record&message_id=2&count=129999 == MESSAGE 0 ========== 57 == {0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786504} command=alerts_send_fake_record&message_id=2&count=1299999 == MESSAGE 0 ========== 57 == {0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786570} command=alerts_send_fake_record&message_id=2&count=12999999 == MESSAGE 0 ========== 57 == {0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786600} ^] telnet> Connection closed. */ // You can verify from the alert server's log the it was receiving and // processing messages from the mast for several minutes solid and it never // tried to reconnect. void checkPing() { if (!_tcpSlave) return; // need better test because multicast is not master. checkConnection(); if (_nextPingTime < time(NULL)) { // Timer went off. if (_pingStatus == Ping::SomethingReceived) // We're receiving data, so but we haven't seen the ping response // yet. Assume the data is legit and the ping response is just // slow because we have too much data. Disconnecting now would // only make things slightly slower, and raise the chance that we // completely lose some data. ThreadMonitor::find().increment("ping late warn"); if (_pingStatus == Ping::Waiting) { // Nothing received recently. Looks like a network error. // Disconnect and try again. //TclList msg; //msg<disconnect(); ThreadMonitor::find().increment("ping failed DISCONNECT"); } else { // Time to send another ping _nextPingTime = time(NULL) + 5; if (!_firstPingTime) _firstPingTime = _nextPingTime; // Encode the _nextPingTime. So when we get a response we know: // a) Is this is the most recent ping request? // b) Otherwise, how old was this ping request? int64_t clientId = ciFirstPing + _nextPingTime - _firstPingTime; if ((int)clientId < ciFirstPing) { // We wrapped around. clientId is 32 bits, so this should // only happen if we keep running for over 68 years. I'll do the // right thing; I'll reset the connection which will cause // _firstPingTime to reset. Should be barely noticeable if at all. // But realistically, if we get here, it's probably a mistake, so log // it! And don't forget, the newer communication libraries are all // 64 bit. _talkWithServer->disconnect(); TclList msg; msg<disconnected()"<<"UNEXPECTED!!!" <<"_firstPingTime"<<_firstPingTime<sendMessage(message, this, clientId, true); _pingStatus = Ping::Waiting; } } } } // If we have to reconnect to the master, tell it where we left off. This // is the last thing we successfully received. This is null if we have // not received any records, yet. Record::Ref _lastRecord; void checkConnection(bool createIfRequired=true) { if (_talkWithServer && _talkWithServer->disconnected()) { TclList msg; msg<disconnected()"; sendToLogFile(msg); _talkWithServer->cancelAll(); delete _talkWithServer; _talkWithServer = NULL; _pingStatus = Ping::NotSent; } if (!createIfRequired) // We've put this object into a simpler state. Either _talkWithServer is // null or it's ready to go. return; if (_talkWithServer) // _talkWithServer was already ready to go. return; TclList msg; msg<connect"<<_masterName<<_masterPort; sendToLogFile(msg); _talkWithServer = new TalkWithServer("record dispatcher " + _base); _talkWithServer->connect(_masterName, _masterPort); if (_talkWithServer->disconnected()) // Unable to open the connection. return; TalkWithServer::Message message; message["command"] = getListenCommand(); if (_lastRecord) { // Let the master server know where we left off. The new data should // start seamlessly. const ValueBox idBox = _lastRecord->lookUpValue((FieldId)MainFields::id); bool valid; int64_t id; idBox.getInt(valid, id); if (!valid) sendToLogFile(TclList()<sendMessage(message, this, ciData, true); message["command"] = getStatusCommand(); _talkWithServer->sendMessage(message, this, ciStatus, true); // Request a new ping ASAP. _firstPingTime = _nextPingTime = 0; _pingStatus = Ping::NotSent; } // Received something from the master. virtual void onMessage(std::string bytes, int clientId, TalkWithServer::CancelId cancelId) { const auto somethingReceived = [this]() { // We received some data. This can be a good thing, or nothing at all. // If we were waiting for a ping, mark this as almost as good, a // warning instead of an error, and room to do even better when the // ping finally comes. If we were not waiting for anything, or we // already received a ping, ignore this, we are already in the best // possible state. if (_pingStatus == Ping::Waiting) _pingStatus = Ping::SomethingReceived; }; ThreadMonitor &tm = ThreadMonitor::find(); switch (clientId) { case ciData: { // Data received. Forward to the listeners. tm.increment("ciData"); auto record = Record::create(bytes); if (!record) tm.increment("UNABLE TO PARSE"); else { _lastRecord = record; somethingReceived(); notifyListeners(record); } break; } case ciStatus: { // Status message received. Forward to the listeners. tm.increment("ciStatus"); somethingReceived(); notifyStatusListeners(bytes); break; } default: { // Ping received. const time_t timeout = clientId + _firstPingTime - ciFirstPing; if (timeout == _nextPingTime) { // This response is from the most recent ping request. tm.increment("ping on time"); _pingStatus = Ping::PingReceived; _nextPingTime = 5 + time(NULL); } else { // This is an older response. somethingReceived(); const time_t age = time(NULL) - timeout; TclList msg; msg<getWaitHandle()); if (_talkWithServer) { if (_talkWithServer->wantsRead()) pollSet.addForRead(_talkWithServer->getHandle()); if (_talkWithServer->wantsWrite()) pollSet.addForWrite(_talkWithServer->getHandle()); } pollSet.setTimeoutMs(1000); pollSet.poll(); tm.setState("Read from queue"); _incoming->resetWaitHandle(); while (Request *current = _incoming->getRequest()) { switch (current->callbackId) { case mtNewListener: { // A new internal listener. // See FeedNextServer.C for external listeners. NewListener *request = dynamic_cast< NewListener * >(current); if (request->status) _statusListeners.push_back(request->listener); else _listeners.push_back(request->listener); // The following line reported 8 alert listeners and 0 status listeners, before I commented it out. // {Wed Aug 5 09:47:46 2020} 0 RecordDispatcher.C 457 threadFunction mtNewListener alerts _statusListeners.size() 0 _listeners.size() 8 //sendToLogFile(TclList()<status?"status":"alerts")<<"_statusListeners.size()"<<_statusListeners.size()<<"_listeners.size()"<<_listeners.size()); break; } case mtExternalRecord: { // This should only happen on the master. Someone is submitting // a new record for us to forward. (Or someone is making a related // administrative command.) ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); // Either supply the "record" argument, the "ping" argument, // or the "status" argument. (I.e. there is no sub-command field. // Which data the sender provides determines what we do here.) auto record = Record::create(request->getProperty("record")); if (!record) { if (request->getProperty("ping") == "1") // If you request "ping"="1", you will receive "ping" as the // response. This says that you are attached and the server // is listening for this type of data. More than likely if // the server is not listening for this type of data, you will // never get a response. addToOutputQueue(request->getSocketInfo(), "ping", request->getResponseMessageId()); else if (!request->getProperty("status").empty()) { //sendToLogFile(TclList()<getProperty("status")); notifyStatusListeners(request->getProperty("status")); } else tm.increment("UNABLE TO PARSE"); } else if (!record->update(ID_FIELD, _nextId)) { tm.increment("UNABLE TO SET ID"); } else { // "record" will try to add the record to the stream. If there // is a problem, it is logged locally. No message is sent back // to the client either way. _nextId++; notifyListeners(record); } break; } case mtSendFakeRecord: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); RecordBuilder rb; rb.append(MainFields::symbol, request->getProperty("symbol", "DELL")); rb.reserveInt(MainFields::id, strtolDefault(request->getProperty("id"), 1234)); rb.append(MainFields::timestamp, strtolDefault(request->getProperty("timestamp"), time(NULL))); Record::Ref record = Record::create(rb.exportAsString()); const int count = strtolDefault(request->getProperty("count"), 1); for (int i = 0; i < count; i++) notifyListeners(record); addToOutputQueue(request->getSocketInfo(), record->debugDump()); break; } case mtDoIt: { DoIt *request = dynamic_cast< DoIt * >(current); request->callback(); break; } } delete current; } tm.setState("ping"); checkPing(); if (_talkWithServer) { tm.setState("_talkWithServer"); _talkWithServer->wakeUp(); } if (_talkWithServer && _talkWithServer->pendingResponseCount()) { tm.setState("responses"); _talkWithServer->doResponses(); } } } public: virtual bool isActive() const { return _active; } virtual bool isMaster() const { return _thisIsMaster; } virtual std::string const &getBaseName() const { return _base; } virtual std::string getListenCommand() const { return _base + "_record_listen"; } virtual std::string getStatusCommand() const { return _base + "_status_listen"; } virtual void listenForRecords(RequestListener *listener, int callbackId) { if (_active) { NewListener *request = new NewListener(false); request->listener.listener = listener; request->listener.callbackId = callbackId; _incoming->newRequest(request); } } virtual void listenForStatus(RequestListener *listener, int callbackId) { if (_active) { NewListener *request = new NewListener(true); request->listener.listener = listener; request->listener.callbackId = callbackId; _incoming->newRequest(request); } } RecordDispatcher(std::string const &base) : ThreadClass("RecordDispatcher: " + base), _incoming(NULL), _base(base), _notifyString("notify " + base), _notifyStatusString("notify status " + base), _active(false), _thisIsMaster(false), _tcpSlave(false), _talkWithServer(NULL), _pingStatus(Ping::NotSent) { std::string input = getConfigItem(base + "_master_info"); if (input == "") // No source of data. Not worth creating the thread. // Note: We've added default values for the master info to // ../../live_server/config_common.txt, so now it's less common // to get here. Someone would have to explicitly set this to // the empty string to get here. It still happens, but it's no // longer the default. return; // Don't create _incoming unless we start the thread. Otherwise the // delete socket logic will fail. No sockets will ever get deleted. // If the deadman timer goes off, we will stop listening to the socket, // but we won't send the close message to the other end. // // Do not try to access _incoming unless _active is true. _incoming = new SelectableRequestQueue(getName()); _thisIsMaster = input == "me"; CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand(base + "_send_fake_record", _incoming, mtSendFakeRecord); if (_thisIsMaster) { cd->listenForCommand(base + "_add_record", _incoming, mtExternalRecord); } else if (input == "multicast") { TclList msg; msg<isOkay()) // This test is NOT optional. If you try to call receiver->listen() // and the receiver is not okay, the program might crash. msg<<"FAILED"; else { // Each time the receiver receives a new event we want to move into // this thread and call notifyListeners(). DoIt is a class that lets // us run any code in this thread. (Newer code which uses // ContainerThread.h would call addLambdaToQueue() for this effect.) receiver->listen([=](Record::Ref const &record) { DoIt *request = new DoIt([=]() { notifyListeners(record); }); _incoming->newRequest(request); }); receiver->listen([=](std::string const &status) { DoIt *request = new DoIt([=]() { notifyStatusListeners(status); }); _incoming->newRequest(request); }); msg<<"success"; sendToLogFile(msg); } } else { std::vector< std::string > pieces = explode(":", input); if (pieces.size() != 2) { TclList msg; msg<lookUpValue(MainFields::timestamp); bool valid; time_t timestamp; timestampBox.getInt(valid, timestamp); if (!valid) _noTimeCount++; else { const int64_t now = getMicroTime(); const int64_t late = now - timestamp * 1000000; if (late <= 0) _earlyCount++; else { _totalTime += late; if (late > _worstTime) _worstTime = late; } } } std::string LateRecordCounter::getInfoForThreadMonitor() { if (!_totalCount) return ""; TclList result; result<<"LateRecordCounter" <<"_totalCount"<<_totalCount; auto const normalCount = _totalCount - _noTimeCount - _earlyCount; if (normalCount > 0) result<<"average late"<<(_totalTime/1000000.0/normalCount) <<"_worstTime"<<_worstTime/1000000.0; if (_noTimeCount) result<<"_noTimeCount"<<_noTimeCount; if (_earlyCount) result<<"_earlyCount"<<_earlyCount; _noTimeCount = _earlyCount = _totalCount = _totalTime = _worstTime = 0; return result; } LateRecordCounter::LateRecordCounter() : _noTimeCount(0), _earlyCount(0), _totalCount(0), _totalTime(0), _worstTime(0) { } ///////////////////////////////////////////////////////////////////// // IRecordDispatcher ///////////////////////////////////////////////////////////////////// IRecordDispatcher *IRecordDispatcher::_alerts = NULL; IRecordDispatcher *IRecordDispatcher::_topList = NULL; void IRecordDispatcher::init() { _alerts = new RecordDispatcher("alerts"); _topList = new RecordDispatcher("top_list"); if (!(_alerts->isActive() || _topList->isActive())) { // Assume this is a mistake. TclList msg; msg<