#include "../shared/ContainerThread.h" #include "../shared/CommandDispatcher.h" #include "../shared/ThreadMonitor.h" #include "../shared/DatabaseSupport.h" #include "../shared/SimpleLogFile.h" #include "../shared/DatabaseForThread.h" #include "UserInfo.h" #include "Types.h" #include "ClientUseCase.h" /* This file handles the client_use_case command from the client. We were * getting a lot of these messages. This used to be handled in SymbolLists.C. * That code worked but all of these messages would activate the tar pit code. * That would slow down other things, like trying to share a cloud link or * opening a config window. * * This new code is functionally equivalent, but hopefully more efficient. And * it does not activate the tar pit. (Presumably we don't need the tar pit * because this code is more efficient.) We try to group multiple client * requests into one SQL transaction. * * Sometimes we have identical requests from the client. Instead of telling * the database to increment a field N times, we give the database a single * request to add N to that field. * * Even when we have different requests, we can still combine them. We use * BEGIN and COMMIT to create a transaction. * ../generate_alerts/misc_framework/AccumulateInsert.h uses the same trick * very successfully. We've never used it in production before now, but tests * show that many inserts into a table in the same transaction were much faster * than the same inserts each in their own implicit transaction. * * We expect to receive an average of around 18 - 24 client updates per period. * That's based on the average number we get in a day. But clearly some * periods will have a lot more activity than others and there will be huge * spikes in the rate. This code will smooth things out by saving a lot of * time in the busy times, less in the quiet times, and possibly adding a tiny * cost at the quietest times. (Those numbers are the total of all servers. * really we should divide by the number of instances of ax_alert_server * running, currently 4.) */ namespace ClientUseCase { // Increment this once for each incoming request. // Assign time to this while internally saving the request, but not while // writing the request to the database. static const std::string s_mtClientUseCase_request = "mtClientUseCase_request"; // Increment this each time we wrote to the database because of a timeout. static const std::string s_mtClientUseCase_timeout = "mtClientUseCase_timeout"; // Increment this each time we wrote to the database because we had too much // in memory. static const std::string s_mtClientUseCase_max_count = "mtClientUseCase_max_count"; // Assign time to these while creating sql statements, and while executing // them. There is no need for a counter. s_mtClientUseCase_timeout + // s_mtClientUseCase_max_count will tell us how many times we went to the // database. static const std::string s_mtClientUseCase_db_prep = "mtClientUseCase_db_prep"; static const std::string s_mtClientUseCase_db_write = "mtClientUseCase_db_write"; // The number of rows that we added or modified in the database. Hopefully // this will be less than s_mtClientUseCase_request because some requests // got consolidated in memory, before we went to the database. static const std::string s_mtClientUseCase_row = "s_mtClientUseCase_row"; static const std::string s_ExternalLinking = ".ExternalLinking."; // Increment this each time we send data to the external_linking_history // table. // Assign time to this while waiting for the database. Don't track the time // we spend looking at this data and prepairing the SQL statements. // Presumably that is tiny. That time will be assigned to // s_mtClientUseCase_request. static const std::string s_external_linking_history = "external_linking_history"; class Worker : public ForeverThreadUser { private: enum { mtClientUseCase /* See RecordUseCase in the C# client. */ }; time_t _nextWakeTime; typedef std::pair< UserId, std::string > Key; std::map< Key, int64_t > _toSave; void sendToDatabaseNow() { ThreadMonitor::SetState tm(s_mtClientUseCase_db_prep); tm.increment(s_mtClientUseCase_row, _toSave.size()); _nextWakeTime = 0; std::vector< std::string > sql; sql.push_back("BEGIN"); for (auto const &kvp : _toSave) { const UserId userId = kvp.first.first; std::string const &description = kvp.first.second; const std::string count = ntoa(kvp.second); sql.push_back("INSERT INTO client_use_case " "SET date=CURDATE(), user_id=" + ntoa(userId) + ", description='" + mysqlEscapeString(description) + "', count=" + count + " ON DUPLICATE KEY UPDATE count=count+" + count); } sql.push_back("COMMIT"); //sendToLogFile(TclList()<tryAllUntilSuccess (sql.begin(), sql.end(), s_mtClientUseCase_db_write); _toSave.clear(); } // Overriding IContainerThreadUser virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { if (_nextWakeTime) callbacks.wakeAtTime(_nextWakeTime); } // Overriding IContainerThreadUser virtual void awake(std::set< int > const &woken) { if (_nextWakeTime && (time(NULL) >= _nextWakeTime)) { ThreadMonitor::find().increment(s_mtClientUseCase_timeout); sendToDatabaseNow(); } } // If someone reports a client use case, and nothing else is going on, // wait aproximately this many seconds before saving it. Wait so we can // group similar requests together and possibly save resources. Of course, // someone else might have already set the timer, or we could fill memory // with too many requests, so it's possibly that we will talk to the // database sooner. static const int SLEEP_TIME = 5; // The maximum number of items we want to keep in memory. If we pass this // limit write to the database immediately, regardless of the timeout. // Note that this is the number of rows we are writing to the database. // This might be smaller than the number of requests we've received from // the client. static const int MAX_COUNT = 1000; // Overriding IContainerThreadUser virtual void handleRequestInThread(Request *original) { switch (original->callbackId) { case mtClientUseCase: { ThreadMonitor::SetState tm(s_mtClientUseCase_request); tm.increment(s_mtClientUseCase_request); ExternalRequest *request = dynamic_cast< ExternalRequest * >(original); const UserId userId = userInfoGetInfo(request->getSocketInfo()).userId; const std::string description = request->getProperty("description"); _toSave[Key(userId, description)]++; if (_toSave.size() > MAX_COUNT) { tm.increment(s_mtClientUseCase_max_count); sendToDatabaseNow(); } else if (_nextWakeTime <= 0) _nextWakeTime = time(NULL) + SLEEP_TIME; // We store more details for a symbol request. Do we still need // this? This was for one specific customer and I don't think they // ever bought this data. size_t split = description.find(s_ExternalLinking); if (split != std::string::npos) { tm.increment(s_external_linking_history); std::string source = description.substr(0, split); std::string symbol = description.substr(split + s_ExternalLinking.length()); DatabaseForThread(DatabaseWithRetry::MASTER)->tryQueryUntilSuccess ("INSERT INTO external_linking_history" "(timestamp, source, symbol, user_id) VALUES(now(), '" + mysqlEscapeString(source) + "', '" + mysqlEscapeString(symbol) + "', " + ntoa(userId) + ')', s_external_linking_history); } break; } } } public: Worker(IContainerThread *container = NULL) : ForeverThreadUser(container?container:IContainerThread::batch()), _nextWakeTime(0) { CommandDispatcher::getInstance() ->listenForCommand("record_use_case", this, mtClientUseCase); start(); } }; } void InitClientUseCase() { static ClientUseCase::Worker worker; }