#include #include #include #include #include "../shared/GlobalConfigFile.h" #include "../shared/SimpleLogFile.h" #include "../shared/Marshal.h" #include "../shared/MultiCast.h" #include "../shared/CommandDispatcher.h" #include "../shared/ReplyToClient.h" #include "AlertRecordMultiCast.h" // TODO set socket options, non blocking! // I did some. I need to double check to make sure I got it all. ///////////////////////////////////////////////////////////////////// // AlertRecordMultiCastListener // // This is a helper for AlertRecordMultiCastReceiver. We want to // grab packets from the network as quickly as possible and move them // into our own queue before they get dropped. // // Originally we were reading the packets and parsing them in the // same thread. We spent a lot of time uncompressing and otherwise // processing the data before handing it off to the various // listeners. // // Hopefully this new version will be more responsive and will lose // fewer packets. ///////////////////////////////////////////////////////////////////// static const std::string s_X_peek = "X peek"; static const std::string s_X_read = "X read"; static const std::string s_X_process = "X process"; class AlertRecordMultiCastListener : private ForeverThreadUser { public: typedef SmarterP< std::string > Data; typedef std::function< void(Data const &) > Callback; private: const int _fd; const Callback _callback; std::string baseName() { return getContainer()->getThreadName(); } // For IContainerThreadUser virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) override { ThreadMonitor &tm = ThreadMonitor::find(); int count = 0; while (true) { tm.setState(s_X_peek); //sendToLogFile(TclList()< bytes(NULL, nbytes, 'P'); nbytes = recv(_fd, &(*bytes)[0], nbytes, MSG_TRUNC); if (nbytes != (ssize_t)bytes->length()) { TclList msg; msg<length(); if (nbytes < 0) msg<= 0); start(); } }; ///////////////////////////////////////////////////////////////////// // AlertRecordMultiCastBase ///////////////////////////////////////////////////////////////////// AlertRecordMultiCastBase::AlertRecordMultiCastBase (std::string const &baseName) : _baseName(baseName + "_multicast"), _port(-1), _okay(false) { // getConfigItem: alerts_multicast_addr=239.0.1.0:2000 // getConfigItem: top_list_multicast_addr=239.0.2.0:2000 // getConfigItem: This setting is used by both the master and the slaves // getConfigItem: so they know where to get data. // getConfigItem: Use other settings to turn the data on or off. // getConfigItem: Consider alerts_multicast_feed_next_server=1 or // getConfigItem: top_list_master_info=multicast // getConfigItem: Recommend using values in the range 239.0.0.0 - 239.255.255.255. // getConfigItem: This is called "Organization-Local Scope". // getConfigItem: https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xhtml // getConfigItem: Ideally all the groups would be different even if you // getConfigItem: ignored the least significant 5 bits. An unmanaged switch // getConfigItem: ignores those 5 bits when delivering messages so it might // getConfigItem: send you lots of data you don't need. _configInput = getConfigItem(configKey()); if (_configInput != "") { std::vector< std::string > pieces = explode(":", _configInput); if (pieces.size() != 2) { TclList msg; msg< 0); } } } ///////////////////////////////////////////////////////////////////// // No class ///////////////////////////////////////////////////////////////////// enum class ItemType : unsigned char { // A single alert or top list row. Record, // This comes from each alert server to say it is alive. It gets // passed along a similar route the the live data, as a test. Currently // the C++ side doesn't use this. We send these records to the database, // and the database side somehow tickles the monitor_alive table. (I // don't remember the details, but when the database was down and the // C++ stuff was all up, all the entries like "fogell:1202 -> top_list" // and "jules:1203 -> alerts" went down in the back office. Status, // This is the periodic message that we send. It is internal to // AlertRecordMultiCast. Among other things, it's a constant ping, // so it can provide status 24/7. LinkStatus, // We tried to decode the item type and failed. Error }; inline void add(std::string &bytes, ItemType itemType) { bytes += (char)itemType; } inline ItemType takeItemType(std::string &bytes) { if (bytes.empty()) return ItemType::Error; else { const ItemType result = (ItemType)bytes[bytes.size() - 1]; bytes.resize(bytes.size() - 1); if (result > ItemType::Error) return ItemType::Error; else return result; } } static SmarterP< std::string > compressString(std::string const &input) { ThreadMonitor::SetState tm("compress"); tm.increment("compress"); size_t compressedLength = compressBound(input.length()); SmarterP< std::string > result(NULL, compressedLength, '\0'); const int error = compress((unsigned char *)&(*result)[0], &compressedLength, (unsigned char const *)&input[0], input.length()); if (error) { // This should never happen. Effectively an assertion failed. TclList msg; msg<resize(compressedLength); tm.increment("compress in", input.size()); tm.increment("compress out", compressedLength); return result; } // Given the compressed data and the size of the original uncompressed data, // recreate the original uncompressed data. If there is any problem return // the empty string and record the details in ThreadMonitor. static std::string uncompressString(std::string const &compressed, size_t uncompressedSize) { ThreadMonitor::SetState tm("uncompress"); tm.increment("uncompress"); std::string result(uncompressedSize, '\0'); size_t destLen = uncompressedSize; const int error = uncompress((unsigned char *)&result[0], &destLen, (unsigned char const *)&compressed[0], compressed.length()); switch (error) { case Z_OK: if (destLen != uncompressedSize) { tm.increment("uncompress size mismatch"); return ""; } else return result; case Z_MEM_ERROR: // if there was not enough memory, maybe an internal error? tm.increment("Z_MEM_ERROR"); return ""; case Z_BUF_ERROR: // if there was not enough room in the output buffer // So basically the same as uncompress size mismatch. tm.increment("Z_BUF_ERROR"); return ""; case Z_DATA_ERROR: // if the input data was corrupted or incomplete tm.increment("Z_DATA_ERROR"); return ""; default: tm.increment("unexpected uncompress error " + ntoa(error)); return ""; } } // You can uncompress data without knowing the uncompressedSize in advance. // But it's easier if you know the size. And in this context we know we can // store the size in 2 bytes. So every packet contains the compressed data // followed by two bytes indicating the size of the original data. static SmarterCP< std::string > compressPacket(std::string const &original) { SmarterP< std::string > result = compressString(original); marshal(*result, (uint16_t)original.length()); // It's tempting to skip the following line. We'll use more memory // but save time. Why are we bothering with smart pointers in the first // if not to avoid one extra copy? //result->shrink_to_fit(); return result; } // Returns true on success and puts the uncompressed data into packet. // Returns false on failure and packet is in an unknown state. static bool uncompressPacket(std::string &packet) { uint16_t size; try { mPop(packet, size); } catch (MarshallingException &ex) { return false; } packet = uncompressString(packet, size); if (packet.size() == size) return true; return false; } static std::string s_STATUS_TIME = "time"; static std::string s_STATUS_FILL_IN = "fill in"; static std::string s_ID = "id"; ///////////////////////////////////////////////////////////////////// // AlertRecordMultiCastSender ///////////////////////////////////////////////////////////////////// void AlertRecordMultiCastSender::handleRequestInThread(Request *original) { switch (original->callbackId) { case mtFillIn: { ThreadMonitor::SetState tm("mtFillIn"); ExternalRequest *current = dynamic_cast< ExternalRequest * >(original); const int64_t id = strtolDefault(current->getProperty(s_ID), -1); if (PacketAndTime *packetAndtime = getProperty(_recentData, id)) { addToOutputQueue(current->getSocketInfo(), packetAndtime->packet, current->getResponseMessageId()); tm.increment("mtFillIn good"); } else { addToOutputQueue(current->getSocketInfo(), "", current->getResponseMessageId()); tm.increment("mtFillIn bad"); } /* TclList msg; msg<first <<"newest"<<_recentData.rbegin()->first; sendToLogFile(msg); */ // To test this code from the command line: // 1) Uncomment the logging code above. // 2) telnet 127.0.0.1 7899 // 3) command=top_list_multicast_fill_in&message_id=12 // 4) command=top_list_multicast_fill_in&id=294&message_id=12 // 5) command=top_list_multicast_fill_in&id=304&message_id=12 // I was watching the log file from this process while I was telnetting in. // The first time I didn't even specify an id and I saw this in the log: // {Sun Aug 16 19:59:35 2020} 0 AlertRecordMultiCast.C 345 handleRequestInThread {requested id} -1 oldest 292 newest 293 // The second time I wasn't fast enough and I saw this: // {Sun Aug 16 20:00:11 2020} 0 AlertRecordMultiCast.C 345 handleRequestInThread {requested id} 294 oldest 296 newest 303 // The third time I saw a good response on telnet and nothing in the log. break; } case mtDebug: { ThreadMonitor::SetState tm("mtDebug"); ExternalRequest *current = dynamic_cast< ExternalRequest * >(original); TclList msg; msg<getProperty("skip"), -1); if (skip >= 0) { msg<getProperty("replay"); if (!replay.empty()) { std::vector< std::string > pieces = explode(",", replay); TclList r; r<<"replay"; for (std::string const &piece : pieces) { bool found = false; if (const int64_t id = strtolDefault(piece, 0)) if (PacketAndTime *const packetAndTime = getProperty(_recentData, id)) { _packetsToSend.push_back(packetAndTime->packet); r<getProperty("max_age"), -1); if (maxAge >= 0) { msg<getSocketInfo(), msg, current->getResponseMessageId()); break; /* Here's a transcript of a recent debugging session. * I started by logging into the forwarding server and setting max age to 1,000 seconds and telling it to skip the next 2,000 normal outgoing messages: [phil@becca Logs]$ telnet joey-mousepad 7899 Trying 192.168.1.238... Connected to joey-mousepad. Escape character is '^]'. command=alerts_multicast_debug_send&max_age=1000000000&skip=2000 == MESSAGE 0 ========== 299 == AlertRecordMultiCast.C 377 handleRequestInThread mtDebug {{old skip} 0 {new skip} 2000} {_lastPacketId 335} {_maxFillInAgeMicroSeconds old 15000000 new 1000000000} {_recentData oldest {id 334 {age μs} 10988091 {expires in μs} 989011909} newest {id 335 {age μs} 977975 {expires in μs} 999022025}} I saw that the highest id number was 335, so I should send test packets starting with 336 if I wanted to keep things in order. I asked for a status update to see if new packets had arrived yet. You can't send packets just any time you want. You can only send existing packets, so you have to wait for some external event. command=alerts_multicast_debug_send == MESSAGE 0 ========== 264 == AlertRecordMultiCast.C 377 handleRequestInThread mtDebug {skip 1991} {_lastPacketId 344} {_maxFillInAgeMicroSeconds 1000000000} {_recentData oldest {id 334 {age μs} 56193974 {expires in μs} 943806026} newest {id 344 {age μs} 6202243 {expires in μs} 993797757}} We have data up to 344, so there are 9 new packets that the slaves haven't seen yet. (Alternatively you could have looked at the skip count to see that the server has skipped sending 9 packets.) Send the next 4 packets, but slightly out of order. command=alerts_multicast_debug_send&replay=337,339,336,338 == MESSAGE 0 ========== 290 == AlertRecordMultiCast.C 377 handleRequestInThread mtDebug {skip 1968} {replay 337 339 336 338} {_lastPacketId 367} {_maxFillInAgeMicroSeconds 1000000000} {_recentData oldest {id 334 {age μs} 162487871 {expires in μs} 837512129} newest {id 367 {age μs} 1505373 {expires in μs} 998494627}} The forwarding server shows success. I was also doing a tail -f on a top list slave server's log. This is what I saw: {Mon Aug 17 21:58:07 2020} 0 AlertRecordMultiCast.C 1084 onMessageReceived {WRONG PACKET ORDER} alerts_multicast expected 336 found 337 skipped 1 {Mon Aug 17 21:58:07 2020} 0 AlertRecordMultiCast.C 1084 onMessageReceived {WRONG PACKET ORDER} alerts_multicast expected 336 found 339 skipped 3 This is exactly what I expected. Notice that the top list server received packets in the wrong order, but it put them back in order before processing them. There were no warnings, that's how I knew it worked. Finally I set skip to 0 to return to normal operations. command=alerts_multicast_debug_send&skip=0 == MESSAGE 0 ========== 284 == AlertRecordMultiCast.C 377 handleRequestInThread mtDebug {{old skip} 1964 {new skip} 0} {_lastPacketId 371} {_maxFillInAgeMicroSeconds 1000000000} {_recentData oldest {id 334 {age μs} 205732059 {expires in μs} 794267941} newest {id 371 {age μs} 5211941 {expires in μs} 994788059}} Soon after I sent that command I saw the following in the top list server's log file: {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 1084 onMessageReceived {WRONG PACKET ORDER} alerts_multicast expected 340 found 372 skipped 32 {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 938 sendRequest {{command alerts_multicast_fill_in} {id 340}} {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 1294 beforeSleep sendFillInRequest(340) {after 1501μs} {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 938 sendRequest {{command alerts_multicast_fill_in} {id 341}} {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 1294 beforeSleep sendFillInRequest(341) {after 1501μs} ... {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 938 sendRequest {{command alerts_multicast_fill_in} {id 371}} {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 1294 beforeSleep sendFillInRequest(371) {after 1501μs} {Mon Aug 17 21:58:55 2020} 0 ZLibMalloc.C 70 fixedMalloc reserving 5944 in alerts_multicast_parser {Mon Aug 17 21:58:55 2020} 0 ZLibMalloc.C 70 fixedMalloc reserving 65544 in alerts_multicast_parser {Mon Aug 17 21:58:55 2020} 0 ZLibMalloc.C 70 fixedMalloc reserving 7160 in alerts_multicast_parser {Mon Aug 17 21:58:55 2020} 0 ZLibMalloc.C 70 fixedMalloc reserving 32776 in alerts_multicast_parser {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 977 onMessage {fill in received} 340 use {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 977 onMessage {fill in received} 341 use ... {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 977 onMessage {fill in received} 370 use {Mon Aug 17 21:58:55 2020} 0 AlertRecordMultiCast.C 977 onMessage {fill in received} 371 use In the previous test the top list server only had to reorder the packets I gave it. In this test there were missing packets so it had to request them from the fill in server and merge them in with the live data. As with the previous test, everything went according to plan. Everything was recovered and put back into the right order. By the way, this is what an error looks like. I wasn't typing fast enough. I fixed this by raising the max_age and trying again. command=alerts_multicast_debug_send&replay=394,393,392,391,390 == MESSAGE 0 ========== 334 == AlertRecordMultiCast.C 377 handleRequestInThread mtDebug {skip 460} {replay 394 393 {{Not found} 392} {{Not found} 391} {{Not found} 390}} {_lastPacketId 397} {_maxFillInAgeMicroSeconds 1000000000} {_recentData oldest {id 393 {age μs} 42123006 {expires in μs} 957876994} newest {id 397 {age μs} 2656770 {expires in μs} 997343230}} */ } default: abort(); } } void AlertRecordMultiCastSender::initializeInThread() { ThreadMonitor::find().add(this); } SmarterCP< std::string > AlertRecordMultiCastSender::compress(std::string const &plaintext) { return compressPacket(plaintext); } AlertRecordMultiCastSender::Sent AlertRecordMultiCastSender::trySendNow(std::string const &packet) const { ThreadMonitor::SetState tm("trySendNow"); if (_fd == -1) return Sent::FailForGood; int nbytes = sendto(_fd, packet.c_str(), packet.length(), MSG_DONTWAIT, (struct sockaddr*) &_addr, sizeof(_addr)); if (nbytes >= 0) { tm.increment("sendto Success"); return Sent::Success; } else if ((errno == EAGAIN) || (errno == EWOULDBLOCK) || (errno == EINTR)) { tm.increment("sendto Retry"); return Sent::ShouldRetry; } else { tm.increment("sendto Fail"); return Sent::FailForGood; } } void AlertRecordMultiCastSender::trySendNow() { const auto initial = _packetsToSend.size(); if (initial) { while (!_packetsToSend.empty()) { std::string const &packet = *_packetsToSend.front(); const Sent sent = trySendNow(packet); if (sent == Sent::ShouldRetry) break; _packetsToSend.pop_front(); } const auto sent = initial - _packetsToSend.size(); std::string counter; switch (sent) { case 0: counter = "trySendNow 0"; break; case 1: counter = "trySendNow 1"; break; default: counter = "trySendNow " + ntoa(sent); break; } ThreadMonitor::find().increment(counter); } } void AlertRecordMultiCastSender::beforeSleep(IBeforeSleepCallbacks &callbacks) { if (time(NULL) >= _nextLinkStatusTime) sendLinkStatusMessageNow(); callbacks.wakeAtTime(_nextLinkStatusTime); trySendNow(); if (_packetsToSend.empty() && !_packetBeingBuilt.empty()) { ThreadMonitor::find().increment("packet partial"); pushPacketBeingBuilt(); trySendNow(); } if (!_packetsToSend.empty()) callbacks.waitForWrite(_fd); } void AlertRecordMultiCastSender::pushPacketBeingBuilt() { auto const packet = compress(_packetBeingBuilt); _packetBeingBuilt.clear(); if (_debugSkipCount) { _debugSkipCount--; ThreadMonitor::find().increment("pushPacketBeingBuilt() SKIP"); } else _packetsToSend.push_back(packet); const int64_t now = getMicroTime(); _recentData.emplace(_lastPacketId, PacketAndTime(now, packet)); const int64_t cutOff = now - _maxFillInAgeMicroSeconds; while (_recentData.begin()->second.submitted < cutOff) _recentData.erase(_recentData.begin()); } void AlertRecordMultiCastSender::sendInThread(std::string const &bytes) { static const size_t MAX_SINGLE_ITEM_SIZE = 7500; static const size_t PACKET_BREAK_SIZE = 10000; if (bytes.length() > MAX_SINGLE_ITEM_SIZE) { sendToLogFile(TclList()< PACKET_BREAK_SIZE) { tm.increment("packet full"); pushPacketBeingBuilt(); if (const auto initialSize = _packetsToSend.size()) { trySendNow(); // {pf Q progress} 244 {pf Q stuck} 2829 {pf new queue} 1 {pf new send} 194829 // This is a sample of the data we see from the increment()'s below. // Most of the time we see only "pf new send", i.e. nothing queued, // everything processed immediately. And when we do see something, // "pf new queue" is never higher than 3 and is typically 1. I.e. // one burst of time between when we got behind and when we caught up. // The example above is from when the server first started. When we // start with a Q, we typically see progress a little over 10% of the // time, so it's good that we are checking and sending data a little // sooner. const bool progress = _packetsToSend.size() < initialSize; if (initialSize == 1) // The queue was empty before we received this packet. We // immediately tried to send the packet. if (progress) // There was only one item in the queue and we sent it. tm.increment("pf new send"); else // This new packet will sit in the queue. This is the start // of a delay. We know this happens sometimes. tm.increment("pf new queue"); else // There were already packets in the queue before we added a new // one. So at least one call to sendto() failed but we plan to // retry. Is there value in calling trySendNow() again, // or should we have skipped it and waited for beforeSleep()? // That's what the following two statuses are trying to prove. if (progress) // Good, we sent more to the o/s. Data is getting out sooner. tm.increment("pf Q progress"); else // This last call to trySendNow() was unnecessary, and possibly // wasteful. tm.increment("pf Q stuck"); } } else if (!_packetBeingBuilt.empty()) tm.increment("packet share"); if (_packetBeingBuilt.empty()) { // Starting a new packet. Print a short header. marshal(_packetBeingBuilt, _sessionId); _lastPacketId++; marshal(_packetBeingBuilt, _lastPacketId); } marshal(_packetBeingBuilt, bytes); // Request a wakup soon. This call is advertised as cheap, so don't // overthink it. Call this any time we get a valid request. requestBeforeSleep(); } } void AlertRecordMultiCastSender::sendLinkStatusMessageNow() { std::map< std::string, std::string > map; map[s_STATUS_TIME] = marshal(getMicroTime()); if (!_fillInAddress.empty()) map[s_STATUS_FILL_IN] = _fillInAddress; /* const std::string command = _baseName + "_fill_in"; sendToLogFile(TclList()<getEncoded(); add(encoded, ItemType::Record); invokeIfRequired([=](){ sendInThread(encoded); }); } void AlertRecordMultiCastSender::send(std::string status) { add(status, ItemType::Status); invokeIfRequired([=](){ sendInThread(status); }); } bool AlertRecordMultiCastSender::isOkay() const { return _fd >= 0; } std::string AlertRecordMultiCastSender::_fillInAddress; void AlertRecordMultiCastSender::setListenPort(int port) { _fillInAddress = getShortHostName() + ':' + ntoa(port); } std::string AlertRecordMultiCastSender::getInfoForThreadMonitor() { TclList msg; msg<<"_lastPacketId"<<_lastPacketId<<"_recentData"<<_recentData.size(); return msg; } AlertRecordMultiCastSender::AlertRecordMultiCastSender (std::string const &baseName, IContainerThread *thread) : AlertRecordMultiCastBase(baseName), ForeverThreadUser(thread), _fd(-1), _sessionId(getMicroTime()), _lastPacketId(0), _debugSkipCount(0), _maxFillInAgeMicroSeconds(15000000L), _nextLinkStatusTime(0) { if (okay()) { memset(&_addr, 0, sizeof(_addr)); _addr.sin_family = AF_INET; _addr.sin_addr.s_addr = inet_addr(group()); _addr.sin_port = htons(port()); _fd = socket(AF_INET, SOCK_DGRAM, 0); if (_fd < 0) { TclList msg; msg<listenForCommand(commandName(), this, mtFillIn); const std::string debugCommand = getBaseName() + "_debug_send"; cd->listenForCommand(debugCommand, this, mtDebug); start(); } } } ///////////////////////////////////////////////////////////////////// // AlertRecordMultiCastReceiver::ParsedPacket ///////////////////////////////////////////////////////////////////// AlertRecordMultiCastReceiver::ParsedPacket::ParsedPacket (std::string &bytes, AlertRecordMultiCastReceiver *owner) : _owner(NULL) { ThreadMonitor::SetState tm("ParsedPacket"); try { decompress(bytes); size_t offset = 0; unmarshal(bytes, offset, _sessionId); unmarshal(bytes, offset, _packetId); //TclList msg; //msg<_statusListeners) callback(item); }); break; case ItemType::Record: { Record::Ref record = Record::create(item); if (!record) throw MarshallingException("Corrupt data (create)."); else _actions.push_back([=]() { for (const auto &callback : owner->_recordListeners) callback(record); }); break; } case ItemType::LinkStatus: _actions.push_back([=]() { owner->_linkStatus.update(item); }); break; default: // Interesting. New item types will cause us to throw out the entire // packet. It might make more sense to ignore any unknown item // types and assume that was part of a future upgrade. throw MarshallingException("Corrupt data (itemType)."); } } //sendToLogFile(msg); // Success! _owner = owner; } catch (MarshallingException &ex) { sendToLogFile(TclList()<_lastPacketIdDelivered + 1 != _packetId) { TclList msg; msg<_lastPacketIdDelivered == 0) msg<<"joining session in progress"; else msg<<"DELIVERED OUT OF ORDER"; msg<<"_lastPacketIdDelivered"<<_owner->_lastPacketIdDelivered <<"_packetId"<<_packetId; sendToLogFile(msg); } _owner->_lastPacketIdDelivered = _packetId; _owner = NULL; // Don't call deliverPayload() twice! // Do everything at the end, only after we've successfully decoded // everything. It's like a transaction. Everything succeeds or // everything fails. If the first half of the message looks good, // but the second half is corrupt, assume it's all corrupt. for (auto &action : _actions) action(); ThreadMonitor::find().increment("action", _actions.size()); } ///////////////////////////////////////////////////////////////////// // AlertRecordMultiCastReceiver::LinkStatus ///////////////////////////////////////////////////////////////////// void AlertRecordMultiCastReceiver::LinkStatus::resetDelay() { _delayCount = 0; _totalDelay = 0; maximize(_smallestDelay); minimize(_biggestDelay); } void AlertRecordMultiCastReceiver::LinkStatus::reset() { _fillInAddress.clear(); _owner._fillInConnection.clearAddress(); resetDelay(); } AlertRecordMultiCastReceiver::LinkStatus::LinkStatus (AlertRecordMultiCastReceiver &owner) : _owner(owner) { reset(); } void AlertRecordMultiCastReceiver::LinkStatus::update(std::string const &message) { // ThreadMonitor::find().increment("LinkStatus"); try { std::map< std::string, std::string > map; unmarshal(message, map); std::string const &newFillInAddress = map[s_STATUS_FILL_IN]; if (_fillInAddress != newFillInAddress) { sendToLogFile(TclList()<_highestFillInRequested && !(_serverName.empty() || _serverPort.empty()); } AlertRecordMultiCastReceiver::FillInConnection::FillInConnection (AlertRecordMultiCastReceiver *owner) : ServerConnection64(owner->commandName(), owner->getContainer()), _owner(owner) { } void AlertRecordMultiCastReceiver::FillInConnection::setAddress (std::string const &serverName, std::string const &serverPort) { if ((serverName != _serverName) || (serverPort != _serverPort)) { _serverName = serverName; _serverPort = serverPort; reset(); } } void AlertRecordMultiCastReceiver::FillInConnection::setAddress (std::string const &configString) { // TODO Why is this parsing code copied everywhere? I got it from // ServerConnection64::getAndParseAddress() but I know that's not the only // other place it exists. const std::vector< std::string > pieces = explode(":", configString); const bool success = pieces.size() == 2; if (!success) clearAddress(); else setAddress(pieces[0], pieces[1]); } void AlertRecordMultiCastReceiver::FillInConnection::clearAddress() { setAddress("", ""); } void AlertRecordMultiCastReceiver::FillInConnection::sendRequest(int64_t id) { Message message; message["command"] = _owner->commandName(); message[s_ID] = ntoa(id); sendMessage(message, _owner, id); //sendToLogFile(TclList()< _highestFillInRequested) // Unexpected. Maybe from a previous session? return false; else if (_deferredPackets.count(id)) // We already have a copy of this. Perhaps we had given up on the // original, so we requested a duplicate, then the original finally // arrived, so the duplicate is no longer relevant. return false; else return true; } void AlertRecordMultiCastReceiver::onMessage (std::string bytes, int64_t clientId, ServerConnection64::MessageId messageId) { ThreadMonitor::SetState tm("fill in recv"); const bool useIt = fillInIsStillRelevant(clientId); sendToLogFile(TclList()<isOkay()) // Note: This is what we expect to happen if we ask for fill in data // which has already expired. Of course, we probably won't be asking // for expired data, but we should not panic if we see this. return; auto const packetSessionId = parsed->getSessionId(); auto const packetId = parsed->getPacketId(); if (fillInId) { if (packetSessionId != _sessionId) // Fill in data from a previous session. Toss it! return; if (packetId != fillInId) // Something strange. We got a valid packet but not the one we were // expecting. Looks like a bug. // TODO report this! return; } else if (packetSessionId != _sessionId) { // A multicast packet has a new session id, so we discard everything from // the previous session. _sessionId = packetSessionId; tm.increment("new session " + ntoa(_sessionId)); _linkStatus.reset(); _nextPacketId = 0; // Wildcard! _arrivalTime.clear(); _highestFillInRequested = 0; // No requests sent this server session. for (auto const &kvp : _deferredPackets) kvp.second->deliverPayload(); _deferredPackets.clear(); } else if (packetId != _nextPacketId) { if ((!_deferredPackets.empty()) && (_deferredPackets.rbegin()->first + 1 == packetId)) // One or more packets went missing. Then a bunch more came in a row. // For example: 1, 3, 4, 5, 6, 2, 7 // When we see #3 we report lots of details with sendToLogFile(). // We we see 4, 5 and 6 we just increment() this counter. tm.increment("WPO more"); else { tm.increment("WPO log"); TclList msg; msg< _nextPacketId) msg<<"skipped"<<(packetId - _nextPacketId); else msg<<"duplicated"<<(_nextPacketId - packetId); msg<<"_deferredPackets"<<_deferredPackets.size() <<"_arrivalTime"<<_arrivalTime <<"now"<= what we expect, store that with the other packets. We might // have unblocked some messages, so check for that now or soon. // // In the map we also record the precise time when the packet arrived. // Based on how old the packet is we might want to (a) request another // copy of any packets blocking this packet or (b) skip the missing // packets and move on. (If we see a missing packet our first thought // should be out of order, not dead. Give it a little time before // asking for another copy.) // // If we are holding packets then we need to check on them periodically. // Check on them when we add a new packet, or when some time has // passed. Here's the algorithm: // o Look at the lowest numbered packet we are holding and see if it // matches the one we are looking for. // o If so, send it, remove it from the map, update the expected value, // and jump to the top of the loop. // // Clear the list of packets when we start a new session. // // Maybe a different data structure to say when to request more data. // o If you are adding the first new packet, you need all the packets // >= next expected packet id but < the id of the packet we just // received. // o If you are adding a new packet, and we already have at least one // packet with a higher id, ignore it. // o If you are adding a new packet, and it has the highest id value, and // there is at least one missing id number between the previous highest // id number // o When the timer goes off be sure to check again. Some or all of // the packets that were originally missing might have arrived. // o We need to keep a map from times to ranges. (The lowest and // highest ids that we might request.) Right before we send the // request we'll review that range and give a specific list of // id numbers. // o Seems like this data structure would also be an efficient place to // find the give up time, described above. In fact, the map described // above should only store the packets themselves, not the arrival // time. if (packetId < _nextPacketId) tm.increment("duplicate ignored"); else if (((packetId == _nextPacketId) ||(_nextPacketId == 0)) && _deferredPackets.empty()) { // This is an optimization for the common case. _nextPacketId = packetId + 1; parsed->deliverPayload(); } else if (_deferredPackets.count(packetId)) tm.increment("duplicate ignored"); else { tm.increment(_deferredPackets.empty()?"pause start":"pause more"); _deferredPackets[packetId] = parsed; while ((!_deferredPackets.empty()) && (_deferredPackets.begin()->first == _nextPacketId)) { _arrivalTime.erase(_nextPacketId); _deferredPackets.begin()->second->deliverPayload(); _deferredPackets.erase(_deferredPackets.begin()); _nextPacketId++; tm.increment("RO release"); // reorder } if ((!_deferredPackets.empty()) && (_deferredPackets.rbegin()->first == packetId)) { // The new packet has the highest id number of any packet we have // deferred so far. bool skippedNewPackets; if (_deferredPackets.size() == 1) // The new packet is the only packet that was deferred. So we know // we skipped something or we wouldn't have gotten here. skippedNewPackets = true; else { auto it = _deferredPackets.rbegin(); it--; // The entire right before the current packet. const auto previousPacketPresentId = it->first; // There is space between the current packet and the packet with // the second highest id. So we are missing at least one packet. skippedNewPackets = previousPacketPresentId + 1 < packetId; } if (skippedNewPackets) { // The number of times, not the number of packets! // This is basically the number of holes in the data. tm.increment("skipped new packets"); _arrivalTime[packetId] = getMicroTime(); } } } } void AlertRecordMultiCastReceiver::beforeSleep(IBeforeSleepCallbacks &callbacks) { class ConfigInfo { public: int64_t requestFillInAfter; int64_t abandonAfter; ConfigInfo() : // getConfigItem: multicast_receiver_fill_in_after: // getConfigItem: Wait this many μs after noticing a missing packet // getConfigItem: before asking the server to send a second copy. // getConfigItem: We don't do this immediately because sometimes // getConfigItem: the packets come out of order. You'll get packet // getConfigItem: #1, then packet #3, then after a short pause, you // getConfigItem: finally get packet #2. If this number is too low // getConfigItem: we send extra requests to the master server. If this // getConfigItem: number is too high we wait too long to send the // getConfigItem: replacement request and lots of packets are all // getConfigItem: held up a little longer. requestFillInAfter(strtolDefault(getConfigItem("multicast_receiver_fill_in_after"), 3000)), // getConfigItem: multicast_receiver_abandon_after: // getConfigItem: Wait this many μs after noticing a missing packet // getConfigItem: before giving up on it. At this point we just // getConfigItem: skip some data and process the rest in order. We // getConfigItem: never use the missing data, even if it eventually // getConfigItem: shows up, because it would be out of order. // getConfigItem: Make this too small and you might lose some data. // getConfigItem: Make this too big and you might cause all the data // getConfigItem: to be held up for a while. // getConfigItem: The default is currently 1.5 seconds. We keep // getConfigItem: raising that value. Becca missed two packets at // getConfigItem: 5 seconds after the close. They came in after // getConfigItem: 1.135231 seconds, shortly after we'd given up // getConfigItem: on them. abandonAfter(strtolDefault(getConfigItem("multicast_receiver_abandon_after"), 1500000)) { const bool invalid = (requestFillInAfter < 0) || (abandonAfter < 0); TclList msg; msg<second; if (time > abandonCutOff) break; auto const id = begin->first; abandonCutOffId = id; _arrivalTime.erase(begin); } if (abandonCutOffId) while (!_deferredPackets.empty()) { auto const begin = _deferredPackets.begin(); auto const id = begin->first; if (id > abandonCutOffId) break; begin->second->deliverPayload(); _deferredPackets.erase(begin); _nextPacketId = id + 1; tm.increment("TO release 1"); // timeout } while ((!_deferredPackets.empty()) && (_deferredPackets.begin()->first == _nextPacketId)) { _arrivalTime.erase(_nextPacketId); _deferredPackets.begin()->second->deliverPayload(); _deferredPackets.erase(_deferredPackets.begin()); _nextPacketId++; tm.increment("TO release 2"); // time out } // See what's been missing long enough that we should send out a fill in // request. const auto requestFillInCutOff = now - configInfo.requestFillInAfter; int64_t requestFillInCutOffId = 0; for (auto &kvp : _arrivalTime) { int64_t const &id = kvp.first; int64_t const &time = kvp.second; if (time < requestFillInCutOff) // Everything requested before this id is old enough that we should // send a duplicate request, if we have not alreay. requestFillInCutOffId = id; else { // We're past all of the old requests. We'll deal with this next // request soon. callbacks.wakeAfterMicroSeconds(time - requestFillInCutOff); break; } } if (_highestFillInRequested < requestFillInCutOffId) { // Should we set _highestFillInRequested = std::max(1, _highestFillInRequested) here? Does FillInConnection::shouldTryToConnect() get called before we change _highestFillInRequested, below? for (int64_t id = std::max(_highestFillInRequested + 1, _nextPacketId); id < requestFillInCutOffId; id++) if (!_deferredPackets.count(id)) { _fillInConnection.sendRequest(id); tm.increment("fill in req"); // TODO this next sendToLogFile() is very noisy! const auto flagged = _arrivalTime[requestFillInCutOffId]; sendToLogFile(TclList()<second; auto const wakeupTime = oldestTime + 10000; auto const usUntilWake = wakeupTime - getMicroTime(); callbacks.wakeAfterMicroSeconds(usUntilWake); } } if (_arrivalTime.empty()) // _arrivalTime.empty() => Nothing is queued up and we are not waiting for // any fill in response. // (!_arrivalTime.empty()) && _highestFillInRequested => // Something is queued up and we are waiting for at least one fill in // response. // (!_arrivalTime.empty()) && (!_highestFillInRequested) => // Something is queued up but we have not yet have any outstanding // fill in requests. _highestFillInRequested = 0; } void AlertRecordMultiCastReceiver::listen (std::function< void(Record::Ref const &) > const &f) { invokeIfRequired([=](){ _recordListeners.push_back(f); }); } void AlertRecordMultiCastReceiver::listen (std::function< void(std::string const &) > const &f) { invokeIfRequired([=](){ _statusListeners.push_back(f); }); } AlertRecordMultiCastReceiver::AlertRecordMultiCastReceiver (std::string const &baseName) : AlertRecordMultiCastBase(baseName), ForeverThreadUser(IContainerThread::create(getBaseName() + "_parser")), _isOkay(false), _lastPacketIdDelivered(0), _sessionId(0), _nextPacketId(0), _highestFillInRequested(0), _fillInConnection(this), _linkStatus(*this) { if(okay()) { int fd = socket(AF_INET, SOCK_DGRAM, 0); if (fd < 0) sendToLogFile(TclList()<getThreadName() == getBaseName() + "_parser"); // The listener thread only knows about the parser thread // via this callback. The listener only deals in strings of // bytes. The listener does not try to interpret or change // those bytes. auto const callback = [=](SmarterP< std::string > data) { doWorkInThread([=]() { onMessageReceived(*data); }); }; // And we start listening now. new AlertRecordMultiCastListener(fd, callback, listenerThread); start(); _fillInConnection.start(); } } } } } } } /* Test results based on the last 3,000 alerts from Friday night. 8/7/2020 (A bug / feature of a current setup, each time I restart my forwarding server it copies the last 3000 records from the server and forwards those. I have outstanding TODO items to look at that, but for now that's just the fastest easiest way to get test data on the weekend.) These were delivered in 322 packets. 0 packet loss. uncompress: 0.029919855 seconds AlertRecordMultiCastReceiver::onMessageReceived: 0.010738825 seconds -- each packet unmarshal and examine id numbers. Loop through the individual messages, copy/extract the body of each individual record or status message, create and verify records, forward records and status messages to listeners . Listeners can provide arbitrary callbacks; in fact RecordDispatcher is the only listener and it just sends a message to its own thread. {X read} 0.0040441777 seconds total. {X peek} 0.0012810036 seconds total. {X null} 0.00013001421 seconds total, 130 microseconds total, 404 nanoseconds per call to ThreadMonitor::increment(); (plus other minor overhead) Total bytes before compression: 3,029,473 Total bytes after compression: 1,111,450 compression rate: 63.3% (100% is ideal, 0% is none, like gzip) top list results were similar. only 281 packets required for 3000 records. uncompress: 0.030547251 seconds AlertRecordMultiCastReceiver::onMessageReceived: 0.008254791 seconds {X read} 0.004672368 seconds total. {X peek} 0.0009258363 seconds total. {X null} 0.00011800197 seconds total, 118 microseconds total, 420 nanoseconds per call to ThreadMonitor::increment(); Total bytes before compression: 2,688,922 Total bytes after compression: 1,485,014 compression rate: 44.8% The timing difference between the alerts and the top lists are all within the margin of error and don't mean much. Looking at the logs for a similar process I see that the numbers for these two types of data are closer than the examples shown here. I.e. the difference between one test run and the next is bigger than the difference between alerts data and top list data. The difference in compression is real! The alert server often generates several alerts from the same print all at once. All but a few fields will be identical. */ /* Here are the results from the same test after I broke AlertRecordMultiCastReceiver into two threads. This is just the alerts from just one instance of the server. The results seem generally similar to before. Presumably the diffrences are just measurement error. The listener thread shows some interesting results. The "X process" phase takes slightly more time than the reading. And some of the normal overhead functions are getting close. I was looking at optimizing the system calls, but my normal library functions seem to be a better place to look for improvements. {Sun Aug 9 11:04:28 2020} 0 ThreadMonitor.C alerts_multicast_listener total_time 300.5 states { poll 100% {X process} 0.001057% - 0.003176285 seconds (Send to alerts_multicast_parser) {X read} 0.001044% - 0.00313722 seconds {X peek} 0.0004369% - 0.0013128845 seconds {poll() finish} 0.0003248% - 0.000976024 seconds counters {EAGAIN 353 {X count} 352} {RequestQueue {read count} 1 {average time μs} 44 {worst time μs} 44} {Sun Aug 9 11:04:28 2020} 0 ThreadMonitor.C alerts_multicast_parser total_time 300.5 states { poll 99.98% uncompress 0.01041% - 0.03128205 seconds AlertRecordMultiCastReceiver::onMessageReceived 0.004369% - 0.013128845 seconds {Read from queue} 0.000769% - 0.002310845 seconds {poll() finish} 0.0002316% - 0.000695958 seconds counters {action 3030 {new session 1596996108088761} 1 uncompress 352} {RequestQueue {read count} 354 {average time μs} 33.0056497175141 {worst time μs} 740} */