#include "../shared/ThreadMonitor.h" #include "../shared/ReplyToClient.h" #include "../shared/ThreadClass.h" #include "../shared/LogFile.h" #include "../shared/CommandDispatcher.h" #include "../shared/GlobalConfigFile.h" #include "RecordDispatcher.h" #include "FieldLists.h" #include "FeedNextServer.h" /* This works a lot like MySQL replication. One server is the master. It * listens for records from many places. And there can be any number of * slaves. Slaves only listen for records from the master. This is the thread * which makes records in this server available to a slave server. */ class FeedNextServer : ThreadClass, private ThreadMonitor::Extra { private: enum { mtNewRecord, /* Received a new record. Pass it on to the listeners. */ mtNewStatus, /* Received a status message. Pass it on to the * listeners. */ mtNewListener, /* An external process wants data. Immediately send him * some data, depending on the request, then store the * request, so all new records will go to this * listener. */ mtNewStatusListener /* An external process wants to listen for status * messages. */ }; RequestQueue _incoming; static const FieldId ID_FIELD = (FieldId)MainFields::id; // We store a small number of records. If the listener gets disconnected // briefly, we try to fill in the missing data. This says how many records // to keep. const int _maxCount; // We store a small number of records. If the listener gets disconnected // briefly, we try to fill in the missing data. These are the records, // ordered by id. std::map< int64_t, Record::Ref > _records; // Who is listening. Inclues the socket and the message id. Assume that // a single server (or at least a single socket) will only have one open // connection at a time. Otherwise we'd need a different data structure. std::map< SocketInfo *, ExternalRequest::MessageId > _listeners; // Same as above but for status messages. Usually people ask for both at // the same time, but each will have a different response id. std::map< SocketInfo *, ExternalRequest::MessageId > _statusListeners; // for ThreadMonitor::Extra virtual std::string getInfoForThreadMonitor() { TclList result; result<<"_records"<<_records.size() <<"_listeners"<<_listeners.size() <<"_statusListeners"<<_statusListeners.size(); return result; } void newRecord(Record::Ref const &record) { int64_t id; bool valid; record->lookUpValue(ID_FIELD).getInt(valid, id); if (!valid) { ThreadMonitor::find().increment("INVALID record id"); // Tempting to make this an assertion. Currently we enforce this in // the master, but not in the slaves. So this could be a bad value from // another process, not this process. return; } std::string body = record->getEncoded(); for (auto it = _listeners.cbegin(); it != _listeners.end(); it++) addToOutputQueue(it->first, body, it->second); _records[id] = record; if (((int)_records.size()) > _maxCount) _records.erase(_records.begin()); } void newStatus(std::string const &name) { for (auto it = _statusListeners.cbegin(); it != _statusListeners.end(); it++) addToOutputQueue(it->first, name, it->second); } void addListener(SocketInfo *socket, ExternalRequest::MessageId messageId, int64_t startAt) { // Send everything greater than or equal to startAt right away. 1 means // send everything we have. 0, as a special case, means send nothing. // // We only look at startAt right now. We don't save it. If you ask to // start at record 100, and the highest we have right now is 95, you will // get nothing now, but you will get record 96 when it comes in. _listeners[socket] = messageId; if (startAt >= 0) for (std::map< int64_t, Record::Ref >::const_iterator it = _records.lower_bound(startAt); it != _records.end(); it++) addToOutputQueue(socket, it->second->getEncoded(), messageId); TclList msg; msg<first)<<(_records.rbegin()->first); LogFile::primary().sendString((std::string)msg, socket); } void addStatusListener(SocketInfo *socket, ExternalRequest::MessageId messageId) { _statusListeners[socket] = messageId; } protected: void threadFunction() { ThreadMonitor &tm = ThreadMonitor::find(); tm.add(this); const std::string sWorking = "working"; tm.setState(sWorking); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtNewRecord: { static const std::string state = "mtNewRecord"; tm.setState(state); tm.increment(state); NewRecord *request = dynamic_cast< NewRecord * >(current); newRecord(request->record); break; } case mtNewStatus: { static const std::string state = "mtNewStatus"; tm.setState(state); tm.increment(state); ProviderStatus *request = dynamic_cast< ProviderStatus * >(current); newStatus(request->name); break; } case mtNewListener: { static const std::string state = "mtNewListener"; tm.setState(state); tm.increment(state); ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); const int64_t startAt = strtolDefault(request->getProperty("start_at"), 0); addListener(request->getSocketInfo(), request->getResponseMessageId(), startAt); break; } case mtNewStatusListener: { static const std::string state = "mtNewStatusListener"; tm.setState(state); tm.increment(state); ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); addStatusListener(request->getSocketInfo(), request->getResponseMessageId()); break; } case DeleteSocketThread::callbackId: _listeners.erase(current->getSocketInfo()); _statusListeners.erase(current->getSocketInfo()); break; } tm.setState(sWorking); delete current; } _incoming.waitForRequest(); } } FeedNextServer(IRecordDispatcher *source) : ThreadClass("FeedNextServer " + source->getBaseName()), _incoming(getName()), // getConfigItem: alerts_queueSize & top_list_queueSize // getConfigItem: How far behind should this thread get before it reports // getConfigItem: something to the log? By default a thread's event // getConfigItem: queue first reports when it has 100 unread items. // getConfigItem: Alerts and top list items often come in large groups at // getConfigItem: once, so the log would be overwhelmed if we didn't // getConfigItem: change that. _maxCount(strtolDefault(getConfigItem(source->getBaseName() + "_queueSize"), 3000)) { CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand(source->getListenCommand(), &_incoming, mtNewListener); cd->listenForCommand(source->getStatusCommand(), &_incoming, mtNewStatusListener); source->listenForRecords(&_incoming, mtNewRecord); source->listenForStatus(&_incoming, mtNewStatus); TclList msg; msg<getListenCommand()<<_maxCount; LogFile::primary().sendString(msg); startThread(); } public: static void init(IRecordDispatcher *source) { // getConfigItem: alerts_feed_next_server & top_list_feed_next_server // getConfigItem: By default, the master will listen for slaves, but slaves // getConfigItem: will not listen for other slaves. But the config item // getConfigItem: can override this in either direction. // getConfigItem: 1 means to listen for slaves and forward the data they // getConfigItem: request. 0 disables this feature, mostly to save // getConfigItem: resources. bool create = source->isMaster(); std::string value = getConfigItem(source->getBaseName() + "_feed_next_server"); if (value == "1") create = true; else if (value == "0") create = false; if (create) new FeedNextServer(source); } }; void initFeedNextServer() { FeedNextServer::init(IRecordDispatcher::getAlerts()); FeedNextServer::init(IRecordDispatcher::getTopList()); } /* Test code: Start this right away. killall fast_alert_search ./fast_alert_search -i alerts_master_info=me -i top_list_master_info=me -i database_RO=drama >& /tmp/fast_alert_search.txt %'; Initially nothing should change in the database. Start this later. ./fast_alert_search -i copy_alerts_to_database=1 -i copy_top_list_to_database=1 -i real_database=mydb -i database_noreplicate=drama::no_replicate -i alerts_master_info=localhost:7897 -i top_list_master_info=localhost:7897 -i database_RO=drama -i listen_port=7899 >& /tmp/slave_server.txt