#include "../shared/SimpleLogFile.h" #include "../shared/GlobalConfigFile.h" #include "../shared/ThreadMonitor.h" #include "../shared/ServerConnection.h" #include "FeedMaster.h" ///////////////////////////////////////////////////////////////////// // FeedMasterServerConnection ///////////////////////////////////////////////////////////////////// class FeedMasterServerConnection : public ServerConnection { private: enum { ciCheckCommand }; const std::string _command; std::string _serverName; std::string _serverPort; std::vector< std::string >::size_type _nextInterestingQueueSize; std::vector< std::string > _queue; bool _done; bool _connectionConfirmed; IContainerThreadUser::Ref _keepMeAlive; TalkWithServer::Message _newRecordMessage; void send(std::string const &record) { // Start by calling getConnection(). This will check the status of the // connection, possibly setting _connectionConfirmed to false. Before this // call we can't be sure of the connection status. TalkWithServer *talkWithServer = getConnection(false); if (talkWithServer && _connectionConfirmed) { _newRecordMessage["record"] = record; talkWithServer->sendMessage(_newRecordMessage); } else { // TODO // Maybe throw records out if the queue gets too big. Consider the case // where we are generating top_list and alert records. If one of the // servers is down, we still want to send to the other server. We don't // want to crash. // We queue stuff in case of a temporary problem. I.e. the server // receiving the data from us is down, or we just can't connect to it // over the network. getConnection() will give us a connection at this // point, but it won't be any good. That's the normal operation because // in most cases you resend the same request if the server connection // restarts. However, we don't keep a copy of this data, so we need // to check if the server is really ready. _queue.push_back(record); if (_queue.size() >= _nextInterestingQueueSize) { _nextInterestingQueueSize = _queue.size() * 2; TclList msg; msg<sendMessage(message); ThreadMonitor::find().increment("update dead man timer"); } else { ThreadMonitor::find().increment("UNABLE TO UPDATE DEAD MAN TIMER"); } } protected: virtual std::string getServerName() { return _serverName; } virtual std::string getServerPort() { return _serverPort; } virtual bool shouldTryToConnect() { return !_done; } virtual void onNewConnection(TalkWithServer *talkWithServer) { _connectionConfirmed = false; TalkWithServer::Message message; message["command"] = _command; message["ping"] = "1"; talkWithServer->sendMessage(message, this, ciCheckCommand, false); } virtual void onMessage(std::string bytes, int clientId, TalkWithServer::CancelId cancelId) { if (clientId == ciCheckCommand) { if (bytes != "ping") { TclList msg; msg<::const_iterator it = _queue.begin(); it != _queue.end(); it++) { // We could possibly make this more efficient. Send first checks // the status then sends the record. We don't expect the status // to change between iterations of this loop. send(*it); } _queue.clear(); } } else ServerConnection::onMessage(bytes, clientId, cancelId); } class SendRequest : public IContainerThread::Work { private: IContainerThreadUser::Ref _worker; const std::string _record; public: SendRequest(IContainerThreadUser::Ref const &worker, std::string const &record) : _worker(worker), _record(record) { } virtual void inThread() { dynamic_cast< FeedMasterServerConnection & >(*_worker).send(_record); } }; class SendDeadManRequest : public IContainerThread::Work { private: IContainerThreadUser::Ref _worker; const std::string _name; public: SendDeadManRequest(IContainerThreadUser::Ref const &worker, std::string const &name) : _worker(worker), _name(name) { } virtual void inThread() { dynamic_cast< FeedMasterServerConnection & >(*_worker) .updateDeadManTimerNow(_name); } }; public: FeedMasterServerConnection(std::string baseName) : ServerConnection("feed master " + baseName), _command(baseName + "_add_record"), _nextInterestingQueueSize(100), _connectionConfirmed(false), _keepMeAlive(this) { // This is the same config item that a slave uses to listen to a master. std::string input = getConfigItem(baseName + "_master_info"); std::vector< std::string > pieces = explode(":", input); if (pieces.size() != 2) { TclList msg; msg<addUser(_keepMeAlive); TclList msg; msg<removeUser(_keepMeAlive); _keepMeAlive = NULL; } void sendRecord(std::string const &record) { if (_done) return; SendRequest *sendRequest = new SendRequest(_keepMeAlive, record); getContainer()->addToQueue(sendRequest); } void updateDeadManTimer(std::string const &name) { if (_done) return; SendDeadManRequest *sendDeadManRequest = new SendDeadManRequest(_keepMeAlive, name); getContainer()->addToQueue(sendDeadManRequest); } }; ///////////////////////////////////////////////////////////////////// // FeedMaster ///////////////////////////////////////////////////////////////////// FeedMaster::FeedMaster(bool alerts) { _serverConnection = new FeedMasterServerConnection(alerts?"alerts":"top_list"); } FeedMaster::~FeedMaster() { _serverConnection->done(); } void FeedMaster::sendRecord(std::string const &record) { _serverConnection->sendRecord(record); } void FeedMaster::updateDeadManTimer(std::string const &name) { _serverConnection->updateDeadManTimer(name); }