#include "RecordDispatcher.h" #include "../shared/ContainerThread.h" #include "FieldLists.h" #include "../generate_alerts/misc_framework/AccumulateInsert.h" #include "../shared/DatabaseForThread.h" #include "../shared/GlobalConfigFile.h" #include "CopyAlertsToDatabase.h" class CopyAlertsToDatabase : public ForeverThreadUser { private: enum { mtNewAlert, mtProviderStatus }; const std::string _realDatabase; DatabaseWithRetry _database; DatabaseWithRetry *const _monitorAliveDatabase; DatabaseWithRetry *initMonitorAliveDatabase() { // getConfigItem: copy_to_database_monitor_alive_use_master=1 means to // getConfigItem: use the standard DatabaseWithRetry::MASTER to report // getConfigItem: to monitor_alive. The default is to use // getConfigItem: database_noreplicate, which we are using in the rest // getConfigItem: of this class. const bool useMaster = getConfigItem("copy_to_database_monitor_alive_use_master") == "1"; TclList msg; msg<getThreadName()); } else { // The traditional route. Use the same database connection to do our // work and to report our status. That automatically checks for one // more type of problem. msg<<"&_database"; result = &_database; } sendToLogFile(msg); assert(result && result->probablyWorks()); return result; } IRecordDispatcher *dispatcher() { return IRecordDispatcher::getAlerts(); } std::vector< DatabaseFieldInfo > const &_fields; AccumulateInsert _insertAccumulator; void sendAlertsNow(bool full) { ThreadMonitor::find().increment(full?"send_full":"send_partial"); _database.tryQueryUntilSuccess(_insertAccumulator.get()); } virtual void handleRequestInThread(Request *original) { switch (original->callbackId) { case mtNewAlert: { ThreadMonitor::SetState save("mtNewAlert"); NewRecord *newRecord = dynamic_cast< NewRecord * >(original); _insertAccumulator.add(DatabaseFieldInfo::makeSql(newRecord->record, _fields)); if (_insertAccumulator.full()) sendAlertsNow(true); break; } case mtProviderStatus: { ThreadMonitor::SetState save("mtProviderStatus"); ProviderStatus *providerStatus = dynamic_cast< ProviderStatus * >(original); std::string sql = "UPDATE " + _realDatabase + ".monitor_alive SET last_update = NOW() WHERE name = '" + mysqlEscapeString(providerStatus->name) + " -> alerts'"; _monitorAliveDatabase->tryQueryUntilSuccess(sql); break; } } } virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { if (!_insertAccumulator.empty()) sendAlertsNow(false); } std::string commonSql() { // We're trying a trick here. We're going to "use no_replicate" and then // put the alerts into mydb.alerts. That way the alerts aren't sent to // the binary log. We haven't used the binary log for alerts for some // time. That might help with performance. That might also make it easier // to do certain maintenance activites. Currently the slaves ignore // the binary log for the alerts table. After making this change, the // slaves can listen again, so, when appropriate, we can send maintenance // commands to the master and have them replicated in the normal way. // Use replace instead of insert. Ideally it won't make a difference. But // if there is a problem, we need a way to get out of it. If one extra // alert gets added, that would cause insert to fail and retry forever. // That's different from the old system where the database was responsible // for the id numbers. return "REPLACE INTO " + _realDatabase + ".alerts(" + DatabaseFieldInfo::makeSqlFieldList(_fields) + ')'; } CopyAlertsToDatabase() : ForeverThreadUser(IContainerThread::create("CopyAlertsToDatabase", false, 500)), _realDatabase(getConfigItem("real_database", "mydb")), _database("@noreplicate", getContainer()->getThreadName()), _monitorAliveDatabase(initMonitorAliveDatabase()), _fields(DatabaseFieldInfo::getAlertFields()), _insertAccumulator(commonSql()) { dispatcher()->listenForRecords(this, mtNewAlert); dispatcher()->listenForStatus(this, mtProviderStatus); start(); } public: static void init() { if (getConfigItem("copy_alerts_to_database") == "1") new CopyAlertsToDatabase(); } }; void initCopyAlertsToDatabase() { CopyAlertsToDatabase::init(); }