#ifndef __AlertRecordMultiCast_h_ #define __AlertRecordMultiCast_h_ #include #include "../shared/ContainerThread.h" #include "../shared/ServerConnection64.h" #include "DataFormat.h" // Things shared by the sender and the receiver. Like the name of the // config item for the multicast channel. And the name of the command // for for the fill in server. class AlertRecordMultiCastBase { private: const std::string _baseName; std::string _group; std::string _configInput; int _port; bool _okay; protected: // "alerts_multicast" or "alerts_multicast" std::string const &getBaseName() const { return _baseName; } // The location of the multicast address in the config file. std::string configKey() const { return _baseName + "_addr"; } // The multicast address. Something like "239.0.1.0:2000" std::string configInput() const { return _configInput; } // The group associated with the multicast address. // Something like "239.0.1.0". char const *group() const { return _group.c_str(); } // The port associated with the multicast address. int port() const { return _port; } // True if we were able to find and parse the multicast address and it looks // reasonable. bool okay() const { return _okay; } // The master will periodically post a TCP/IP address and port number // to use if a slave misses a multicast packet and needs it repeated. // After connecting to that address, use this command name to make the // request. std::string commandName() const { return _baseName + "_fill_in"; } // Basename is expected to be "alerts" or "top_list". However, this // class doesn't case. You could add a new stream without changing // this class. AlertRecordMultiCastBase(std::string const &baseName); }; class AlertRecordMultiCastSender : AlertRecordMultiCastBase, ForeverThreadUser, ThreadMonitor::Extra { private: enum { mtFillIn, mtDebug }; int _fd; struct sockaddr_in _addr; const int64_t _sessionId; int64_t _lastPacketId; std::string _packetBeingBuilt; std::deque< SmarterCP< std::string > > _packetsToSend; // This is normally 0. You can use this simulate dropped packets. If you // set this to N, we will not send the next N packets over the multicast. // However, we will still assign it an id and it will be available from the // fill in command. int _debugSkipCount; static std::string _fillInAddress; struct PacketAndTime { int64_t submitted; SmarterCP< std::string > packet; PacketAndTime(int64_t submitted, SmarterCP< std::string > const &packet) : submitted(submitted), packet(packet) { } }; std::map< int64_t, PacketAndTime > _recentData; int64_t _maxFillInAgeMicroSeconds; static SmarterCP< std::string > compress(std::string const &plaintext); void pushPacketBeingBuilt(); void trySendNow(); enum class Sent { Success, ShouldRetry, FailForGood }; Sent trySendNow(std::string const &packet) const; void sendInThread(std::string const &bytes); time_t _nextLinkStatusTime; void sendLinkStatusMessageNow(); // For IContainerThreadUser virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) override; virtual void initializeInThread() override; virtual void handleRequestInThread(Request *original) override; // For ThreadMonitor::Extra virtual std::string getInfoForThreadMonitor() override; public: // This sets _fillInAddress static void setListenPort(int port); void send(Record::Ref const &record); void send(std::string status); AlertRecordMultiCastSender(std::string const &baseName, IContainerThread *thread = NULL); bool isOkay() const; }; class AlertRecordMultiCastReceiver : AlertRecordMultiCastBase, ForeverThreadUser, ThreadMonitor::Extra, ServerConnection64::IMessageListener { private: bool _isOkay; int64_t _lastPacketIdDelivered; class ParsedPacket { private: AlertRecordMultiCastReceiver *_owner; int64_t _sessionId; int64_t _packetId; std::vector< std::function< void() > > _actions; public: ParsedPacket(std::string &bytes, AlertRecordMultiCastReceiver *owner); // If isOkay() is false, none of the other methods make sense. // getSessionId() and getPacketId() may return uninitialized data, // and deliverPayload() will likely fail an assertion. // // Initially isOkay() is true if we parsed the data okay, false if // we could not parse it. deliverPayload() will change this to // false so you don't accidentally call deliverPayload() more than // once. bool isOkay() const { return _owner; } int64_t getSessionId() const { return _sessionId; } int64_t getPacketId() const { return _packetId; } // Do everything at the end, only after we've successfully decoded // everything. It's like a transaction. Everything succeeds or // everything fails. If the first half of the message looks good, // but the second half is corrupt, assume it's all corrupt. // Do everything at the end, only after we've successfully decoded // everything. It's like a transaction. Everything succeeds or // everything fails. If the first half of the message looks good, // but the second half is corrupt, assume it's all corrupt. // // Also, when packets come out of order, we rearrange them before // trying to deliver them. void deliverPayload(); typedef SmarterP< ParsedPacket > Ref; }; // This is an opaque handle created by the master. If it changes, we know // there is a new master and we should reset a lot of our state. int64_t _sessionId; // This is the id of the next packet we were expecting to send out. // o If we receive this packet, all is good, we immediately forward the // packet to our listeners and increment this value. // o 0 is a wild card. If this is 0 then it always matches the next // packet, as described abbove. // o If we receive a packet that came before this one, throw it out. // o If we receive a packet that comes after this one, save it it // _deferredPackets. int64_t _nextPacketId; // When packets come out of order, store them here. We always process // them in order. std::map< int64_t, ParsedPacket::Ref > _deferredPackets; // Store the time when we receive relevant packets. This only applies // to packets currently in _deferredPackets. (I.e. the key set of // _arrivalTime should be a subset of the key set of _deferredPackets.) // This is used to decide when to request a second copy of a packet // and/or when to skip the packet and move on. // Maps from packet id number to a timestamp with microseconds. std::map< int64_t, int64_t > _arrivalTime; // We don't want to send duplicate requests. If we didn't get the // multicast, then we send a fill in request. We don't repeat that // request unless the ServerConnection64 said there was a delivery // problem. We send out the fill in requests in order, so if we want // to avoid duplicates, all we really need to know is the last one // we've sent so far. int64_t _highestFillInRequested; std::vector< std::function< void(Record::Ref const &) > > _recordListeners; std::vector< std::function< void(std::string const &) > > _statusListeners; class FillInConnection : ServerConnection64 { private: AlertRecordMultiCastReceiver *const _owner; // From ServerConnection64 virtual bool shouldTryToConnect() override; public: FillInConnection(AlertRecordMultiCastReceiver *owner); // This will check if the serverName or port has changed, and will reset // the connection exactly when necessary. Calling clearAddress() will // return to the default state where we don't try to connect to anything. void setAddress(std::string const &serverName, // I.e. "fogell", "1234" std::string const &serverPort); void setAddress(std::string const &configString); // I.e. "fogell:1234" void clearAddress(); void sendRequest(int64_t id); void start(); } _fillInConnection; class LinkStatus { private: AlertRecordMultiCastReceiver &_owner; std::string _fillInAddress; int _delayCount; int64_t _totalDelay; int64_t _smallestDelay; int64_t _biggestDelay; void resetDelay(); std::string const &getBaseName() const { return _owner.getBaseName(); } public: void reset(); LinkStatus(AlertRecordMultiCastReceiver &owner); void update(std::string const &message); std::string forThreadMonitor(); } _linkStatus; bool fillInIsStillRelevant(int64_t id); // For ServerConnection64 from _fillInConnection virtual void onMessage(std::string bytes, int64_t clientId, ServerConnection64::MessageId messageId) override; virtual void onAbort(int64_t clientId, ServerConnection64::MessageId messageId) override; static void decompress(std::string &data); void onMessageReceived(std::string &bytes, int64_t fillInId = 0); // For ForeverThreadUser virtual void initializeInThread() override; virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) override; // For ThreadMonitor::Extra virtual std::string getInfoForThreadMonitor() override; public: // All public members are thread safe. bool isOkay() const; // This object will create a new thread and all callbacks will come from // that thread. Do NOT call these unless the object isOkay(). void listen(std::function< void(Record::Ref const &) > const &f); void listen(std::function< void(std::string const &) > const &f); // baseName would typically be "alerts" or "top_list". This gets used in // lookups to the config file and in logging. Aside from the address and // port number, alerts and top list entries really look the same to this // class. AlertRecordMultiCastReceiver(std::string const &baseName); }; #endif