#include #include "../shared/NewConnections.h" #include "../shared/Ping.h" #include "../shared/ReplyToClient.h" #include "../shared/GlobalConfigFile.h" #include "../shared/DatabaseWithRetry.h" #include "../shared/CommandDispatcher.h" #include "../shared/SimpleLogFile.h" #include "../shared/MiscSQL.h" #include "FieldLists.h" #include "../shared/ContainerThread.h" #include "FeedMaster.h" #include "TestFromBinaryLog.h" class TestFromDatabase : public IContainerThreadUser, private RequestListener { private: const IContainerThreadUser::Ref _keepMeAlive; DatabaseWithRetry _database; FeedMaster &_feedMasterAlerts; time_t _timeOffsetFrom; int _timeOffset; double _rate; time_t getSimulatedTime() { const double elapsedRealTime = (TimeVal(true).asMicroseconds()/1000000.0) - _timeOffsetFrom; const double elapsedSimulatedTime = elapsedRealTime * _rate; const time_t simulatedStartTime = _timeOffsetFrom - _timeOffset; const time_t simulatedTime = (time_t)(simulatedStartTime + elapsedSimulatedTime); return simulatedTime; } bool isPaused() { return _rate <= 0.0; } time_t _queryStartTime, _queryEndTime; int _querySkipCount; bool queryInProcess() { return _querySkipCount > 0; } static const int MAX_RECORDS = 1000; std::string makeSql() { static const std::string sql1 = "SELECT " + DatabaseFieldInfo::makeSqlFieldList (DatabaseFieldInfo::getAlertFields()) + " FROM alerts WHERE timestamp BETWEEN \""; // start time / earlier time static const std::string sql2 = "\" AND \""; // end time / later time static const std::string sql3 = "\" ORDER BY timestamp, id LIMIT " + ntoa(MAX_RECORDS) + " OFFSET "; std::string result = sql1; result += timeTToMysql(_queryStartTime); result += sql2; result += timeTToMysql(_queryEndTime); result += sql3; result += ntoa(_querySkipCount); return result; } void configureSimulator(Request *request) { ExternalRequest *externalRequest = dynamic_cast< ExternalRequest * >(request); _timeOffsetFrom = time(NULL); _timeOffset = strtolDefault(externalRequest->getProperty("time_offset"), 0); _rate = strtodDefault(externalRequest->getProperty("rate"), 1.0); _queryStartTime = _queryEndTime = _timeOffsetFrom - _timeOffset; _querySkipCount = 0; TclList msg; msg<getSocketInfo(), msg, externalRequest->getResponseMessageId()); } enum { mtConfigureSimulator }; virtual void handleRequestInThread(Request *original) { switch (original->callbackId) { case mtConfigureSimulator: configureSimulator(original); break; } }; virtual void newRequest(Request *request) { returnToMe(_keepMeAlive, request); } ~TestFromDatabase() { assert(false); } protected: virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { if (isPaused()) return; else if (queryInProcess()) callbacks.wakeAfterMs(0); else callbacks.wakeAfterMs(100); } virtual void awake(std::set< int > const &woken) { if (isPaused()) return; if (!queryInProcess()) { // Start a new query. time_t newEndTime = getSimulatedTime(); if (newEndTime <= _queryEndTime) { // We are already up to date. /* TclList msg; msg<rowIsValid(); result->nextRow()) { found++; Record::Ref record = DatabaseFieldInfo::makeRecord(result, DatabaseFieldInfo::getAlertFields()); _feedMasterAlerts.sendRecord(record->getEncoded()); } TclList msg; msg<= MAX_RECORDS) // Keep trying. _querySkipCount += MAX_RECORDS; else // Start fresh next time. _querySkipCount = 0; } public: TestFromDatabase(FeedMaster &feedMaster) : IContainerThreadUser(IContainerThread::primary()), _keepMeAlive(this), _database(true, "read only database"), _feedMasterAlerts(feedMaster), _rate(0.0) { CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand("configure_simulator", this, mtConfigureSimulator); getContainer()->addUser(_keepMeAlive); } }; int main(int args, char *argv[]) { setlocale(LC_ALL, ""); addConfigItems("listen_port=7895"); addConfigItems("log_file_suffix=TestFromDatabase"); addConfigItems("alerts_master_info=127.0.0.1:7897"); if (!addConfigItemsFromCommandLine(argv + 1)) { return 1; } configItemsComplete(); int listenPort = strtolDefault(getConfigItem("listen_port"), -1); if (listenPort == -1) { std::cerr<<"Invalid listen port \""<getInput(), commandDispatcher->getInputCallbackId()); FeedMaster feedMaster(true); new TestFromDatabase(feedMaster); initTestFromBinaryLog(feedMaster); // Start listening to new sockets now, after the other modules have // had time to configure themselves. NewConnections newConnections(&listener, listenPort); if (!newConnections.getSuccess()) { std::cerr<<"Unable to listen for new connections.\n"; return 3; } while (true) sleep(60); }