#ifndef __StreamingAlertsThread_h_ #define __StreamingAlertsThread_h_ #include #include "../shared/MiscSupport.h" #include "../shared/SocketInfo.h" #include "../shared/Messages.h" #include "../shared/ThreadClass.h" #include "../shared/DatabaseWithRetry.h" #include "../shared/XmlSupport.h" #include "DatabaseThreadShared.h" #include "AlertConfig.h" #include "Types.h" #include "AlertSqlProducer.h" /* This unit is responsible for satisfying specific real-time streaming alert * requests. The caller is responsible for organization, such as keeping track * of the specific requests for each client, and when a request is due to be * executed. This allows one centeralized command center to dispatch work to * multiple threads and even multiple database servers at the same time. */ /* StreamingAlertsRequest finds all of the streaming, realtime alerts for a * user for the current moment. This was made into a Request object, so that * it can be passed to different threads as required. * * Although this was designed so it could easly be passed between threads, * it knows nothing about threads or queues, and it could be handled all in * one thread. * * Each request can only be used once. Notice the required order in the * comments below. * * Some of these functions could be done elsewhere. The primary purpose of * this class is to encapsulate the call to the database, so that can be * done in a different thread. However, by letting this class do the * formatting into XML, and most of the work before that formatting, we * keep the interface cleaner than it was before. The caller still * maintains the list of what requests to do for what user and when. This * class takes care of formatting. Basically this allows us to break up * one large function in the old code into several smaller ones. * * Note: This is complicated because the design evolved so much over time. * Originally there was only one thread for all of this work, which got broken * into multiple threads. It was rearranged more when the main thread was * still too busy. As a result, this is not a great example of how to design a * request. ../fast_alert_search/ has some good examples of newer, better ways * to do this type of work. Look at NewWorkerCluster.h in particular. */ class StreamingAlertsRequest : public Request { private: struct SingleRequest { AlertSqlProducer sqlProducer; AlertConfig::CustomSql sqlStatement; bool keep; bool newSql; int limit; }; typedef std::map< std::string, SingleRequest > RequestMap; RequestMap _originalQueries; const AlertId _startFrom; AlertId _maxAlertId; bool _maxedOut; XmlNode _results; TimeVal::Microseconds _queryTime; public: // First, create an object to record the original request. StreamingAlertsRequest(SocketInfo *socket, AlertId startFrom); // Add each of the queries to send to the database. We make copies of // all of these inputs, rather than pointers, so that a different thread can // satisfy this request. void addOriginalQuery(std::string windowId, AlertSqlProducer const &sqlProducer, AlertConfig::CustomSql const &sql, int limit); // Then you call findAlerts() to look up the results in the database. void findAlerts(DatabaseWithRetry &masterDatabase, DatabaseWithRetry &database, LastIdCacheInfo &lastIdCache); // Copy the sql info back. The producer is a one-shot. You can't keep // calling it, for multiple reasons. The producer might have created // the sql statement in findAlerts(). Next time we want to reuse that // sql, since we can't create it again. void getQuery(std::string const &windowId, AlertSqlProducer &sqlProducer, AlertConfig::CustomSql &sql, bool &sqlIsNew); // Use addFinalQuery() to set the current queries. The request from // the client might have been updated in a different thread, so we need to // look at the most recent request from the client. If this window was // not one of the original queries, we add the window with no data. If // this window was one of the original queries, but the query changed, // we remove any data from that window. This returns true if we have // any data for that window. bool addFinalQuery(std::string windowId, bool wasOriginalQuery); // Then you optionally call addData() for one ore more windows. This will // add data, but only if the window exists and does not have any data. // (The window will typically exist because we just called addFinalQuery().) void addData(std::string windowId, XmlNode const &singleAlert); // After adding the final queries, call this to remove any windows that // were deleted. Only the windows mentioned in a call to addFinalQuery() // remain. void trimResponse(); // Optionally remove some information from the alerts for DEMO users. // Counter is remembered between calls to make sure that we are distributing // the real data evenly. void hideSomeData(int &counter); // Finally we retrieve the formated response in XML for the client. XmlNode const &getResponse() const; XmlNode &getResponse(); // The remaining items return status about the work we did in findAlerts(). TimeVal::Microseconds getQueryTime() const; int getWindowCount() const; bool getMaxedOut() const; AlertId getLastId() const; }; /* This thread is dedicated to satisfying StreamingAlertsRequest's. Multiple * objects of this type can be created, and you can safely dispose of them any * time. * * This is a standard ThreadClass and it uses a standard input queue because * these libraries were available. Presumably the caller will send at most * one request at a time to each object of this class. However, these objects * don't rely on that restriction. */ class StreamingAlertsThread : private ThreadClass, NoCopy, NoAssign { private: enum { mtRequestData, mtDebugReport, mtQuit }; RequestListener * const _returnPath; const int _returnId; DatabaseWithRetry _masterDatabase; DatabaseWithRetry _database; LastIdCacheInfo _lastIdCache; RequestQueue _incoming; protected: void threadFunction(); public: StreamingAlertsThread(RequestListener *returnPath, int returnId, std::string databaseName); ~StreamingAlertsThread(); void requestData(StreamingAlertsRequest *request); void debugReport(); std::string getName() const { return ThreadClass::getName(); } }; #endif