#include "RecordDispatcher.h" #include "../shared/ContainerThread.h" #include "FieldLists.h" #include "../generate_alerts/misc_framework/AccumulateInsert.h" #include "../shared/DatabaseForThread.h" #include "../shared/GlobalConfigFile.h" #include "CopyTopListToDatabase.h" /* This takes top list records from our normal stream and feeds them into the * database. * * With only a small amount of work you could modify CopyAlertsToDatabase to * make it cover top lists, too. It could use the exact same algorithm. This * version is slightly smarter. It might help if the database gets backed up * too much. If the database is slow, we won't store duplicate entries for * the same symbol. That saves memory in this program, but it also means * less work for the database when it is most backed up. * * Currently that trick only works when the database is slow. If the database * is dead then our thread never has time to work. It is tempting to change * the algorithm so the thread adding the records looks for duplicates, rather * than the thread that writes to the database. For now all the interesting * work is done in one thread, as we normally like to do it. But I've tried * to put the relevant code into specific functions so that we can possibly * make that change sometime in the future. */ class CopyTopListToDatabase : public ForeverThreadUser { private: enum { mtNewRecord, mtProviderStatus }; const std::string _realDatabase; DatabaseWithRetry _database; DatabaseWithRetry *const _monitorAliveDatabase; DatabaseWithRetry *initMonitorAliveDatabase() { // getConfigItem: copy_to_database_monitor_alive_use_master=1 means to // getConfigItem: use the standard DatabaseWithRetry::MASTER to report // getConfigItem: to monitor_alive. The default is to use // getConfigItem: database_noreplicate, which we are using in the rest // getConfigItem: of this class. const bool useMaster = getConfigItem("copy_to_database_monitor_alive_use_master") == "1"; TclList msg; msg<getThreadName()); } else { // The traditional route. Use the same database connection to do our // work and to report our status. That automatically checks for one // more type of problem. msg<<"&_database"; result = &_database; } sendToLogFile(msg); assert(result && result->probablyWorks()); return result; } std::map< std::string, Record::Ref > _records; std::queue< std::string > _symbols; IRecordDispatcher *dispatcher() { return IRecordDispatcher::getTopList(); } std::vector< DatabaseFieldInfo > const &_fields; AccumulateInsert _insertAccumulator; void sendRecordsNow() { _database.tryQueryUntilSuccess(_insertAccumulator.get()); } virtual void handleRequestInThread(Request *original) { switch (original->callbackId) { case mtNewRecord: { ThreadMonitor::SetState save("mtNewRecord"); NewRecord *newRecord = dynamic_cast< NewRecord * >(original); Record::Ref const &record = newRecord->record; const ValueBox field = record->lookUpValue((FieldId)MainFields::symbol); bool found; std::string symbol; field.getString(found, symbol); if (found) { const bool newSymbol = _records.insert(make_pair(symbol, record)).second; if (newSymbol) _symbols.push(symbol); else // The database is behind. We already got a new value for this // symbol before we could write out the old value. ThreadMonitor::find().increment("overwrite"); } break; } case mtProviderStatus: { ThreadMonitor::SetState save("mtProviderStatus"); ProviderStatus *providerStatus = dynamic_cast< ProviderStatus * >(original); std::string sql = "UPDATE " + _realDatabase + ".monitor_alive SET last_update = NOW() WHERE name = '" + mysqlEscapeString(providerStatus->name) + " -> top_list'"; _monitorAliveDatabase->tryQueryUntilSuccess(sql); break; } } } virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { while (!(_insertAccumulator.full() || _symbols.empty())) { std::string const &symbol = _symbols.front(); _insertAccumulator.add(DatabaseFieldInfo::makeSql(_records[symbol], _fields)); _records.erase(symbol); _symbols.pop(); } if (!_insertAccumulator.empty()) { ThreadMonitor::find().increment(_symbols.empty() ?"send_partial" // Good. Keeping up. :"send_full"); // Bad. Getting behind. sendRecordsNow(); } if (!_symbols.empty()) // More work to do. Go ahead and read from the event queue. (If we // didn't do that here, we'd never overwrite an old message. We'd have // to send every message to the database even if we got behind.) But // don't go to sleep. As soon as we're done with the message queue, // try and send more items to the database. callbacks.wakeAfterMs(0); } std::string commonSql() { // We're trying a trick here. We're going to "use no_replicate" and then // put the alerts into mydb.alerts. That way the records aren't sent to // the binary log. We haven't used the binary log for the top list for // some time. That might help with performance. That might also make it // easier to do certain maintenance activites. Currently the slaves ignore // the binary log for the top list table. After making this change, the // slaves can listen again, so, when appropriate, we can send maintenance // commands to the master and have them replicated in the normal way. // Use replace instead of insert. Ideally it won't make a difference. But // if there is a problem, we need a way to get out of it. If one extra // alert gets added, that would cause insert to fail and retry forever. // That's different from the old system where the database was responsible // for the id numbers. return "REPLACE INTO " + _realDatabase + ".top_list(" + DatabaseFieldInfo::makeSqlFieldList(_fields) + ')'; } CopyTopListToDatabase() : ForeverThreadUser(IContainerThread::create("CopyTopListToDatabase", false, 500)), _realDatabase(getConfigItem("real_database", "mydb")), _database("@noreplicate", getContainer()->getThreadName()), _monitorAliveDatabase(initMonitorAliveDatabase()), _fields(DatabaseFieldInfo::getTopListFields()), _insertAccumulator(commonSql()) { dispatcher()->listenForRecords(this, mtNewRecord); dispatcher()->listenForStatus(this, mtProviderStatus); start(); } public: static void init() { if (getConfigItem("copy_top_list_to_database") == "1") new CopyTopListToDatabase(); } }; void initCopyTopListToDatabase() { CopyTopListToDatabase::init(); }