#include "../shared/ThreadMonitor.h" #include "../shared/SimpleLogFile.h" #include "../shared/GlobalConfigFile.h" #include "../shared/ContainerThread.h" #include "RecordDispatcher.h" #include "FieldLists.h" #include "DelayedAlerts.h" class DelayedAlerts : public ForeverThreadUser, private ThreadMonitor::Extra { private: enum { mtNewRecord, mtNewListener }; int _delayInSeconds; struct Key { time_t timestamp; int64_t id; bool operator ==(Key const &other) const { return (timestamp == other.timestamp) && (id == other.id); } bool operator <(Key const &other) const { return (timestamp < other.timestamp) || ((timestamp == other.timestamp) && (id < other.id)); } }; typedef std::map< Key, Record::Ref > Queue; Queue _queue; struct Listener { RequestListener *listener; int callbackId; }; std::vector< Listener > _listeners; class NewListener : public Request { public: Listener listener; NewListener() : Request(NULL) { callbackId = mtNewListener; } }; virtual void handleRequestInThread(Request *original) { ThreadMonitor::SetState setState("DelayedAlerts.C handleRequestInThread"); switch (original->callbackId) { case mtNewRecord: { ThreadMonitor::find().increment("DelayedAlerts receive"); NewRecord *newRecord = dynamic_cast< NewRecord * >(original); Record::Ref const &record = newRecord->record; Key key; const ValueBox timestamp = record->lookUpValue((FieldId)MainFields::timestamp); bool valid; timestamp.getInt(valid, key.timestamp); if (!valid) // report something? return; const ValueBox id = record->lookUpValue((FieldId)MainFields::id); id.getInt(valid, key.id); if (!valid) // report something? return; _queue[key] = record; break; } case mtNewListener: { NewListener *newListener = dynamic_cast< NewListener * >(original); _listeners.push_back(newListener->listener); break; } } } virtual void initializeInThread() { ThreadMonitor::find().add(this); } virtual std::string getInfoForThreadMonitor() { TclList result; result<<"records in _queue"<<_queue.size(); return result; } void notifyListeners(Record::Ref const &record) const { ThreadMonitor::find().increment("DelayedAlerts send"); for (std::vector< Listener >::const_iterator it = _listeners.begin(); it != _listeners.end(); it++) { NewRecord *request = new NewRecord(record); request->callbackId = it->callbackId; it->listener->newRequest(request); } } virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { ThreadMonitor::SetState setState("DelayedAlerts.C beforeSleep"); const time_t now = time(NULL); while (true) { if (_queue.empty()) break; const time_t sleepUntil = _queue.begin()->first.timestamp; const int sleepTime = sleepUntil - now + _delayInSeconds; if (sleepTime > 0) { callbacks.wakeAfterMs(sleepTime * 1000); break; } notifyListeners(_queue.begin()->second); _queue.erase(_queue.begin()); } } public: DelayedAlerts() : ForeverThreadUser(IContainerThread::primary()), _delayInSeconds(strtolDefault(getConfigItem("delayed_alerts_delay"), 1200)) { IRecordDispatcher::getAlerts()->listenForRecords(this, mtNewRecord); start(); } void listenForRecords(RequestListener *listener, int callbackId) { NewListener *request = new NewListener; request->listener.listener = listener; request->listener.callbackId = callbackId; newRequest(request); } }; static DelayedAlerts *instance; void delayedAlertsInit() { if (getConfigItem("delayed_alerts_thread") == "1") instance = new DelayedAlerts(); } void delayedAlertsListenForRecords(RequestListener *listener, int callbackId) { if (instance) instance->listenForRecords(listener, callbackId); else sendToLogFile(TclList()<