#include "DatabaseThreadShared.h" #include "ShortHistory.h" ///////////////////////////////////////////////////////////////////// // ShortHistoryRequest ///////////////////////////////////////////////////////////////////// ShortHistoryRequest::ShortHistoryRequest(SocketInfo *socket, AlertId lastId, AlertConfig::CustomSql const &sql) : Request(socket), _lastId(lastId), _sql(sql) { } ///////////////////////////////////////////////////////////////////// // RemoveRequestRequest ///////////////////////////////////////////////////////////////////// class RemoveRequestRequest : public Request { public: RemoveRequestRequest(SocketInfo *socket) : Request(socket) { } ShortHistoryRequest *originalRequest; }; ///////////////////////////////////////////////////////////////////// // ShortHistoryHandler ///////////////////////////////////////////////////////////////////// ShortHistoryHandler::ShortHistoryHandler(RequestQueue *fulfilledRequests, int callbackId) : ThreadClass("ShortHistoryHandler"), _callbackId(callbackId), _database(true, "ShortHistoryHandler"), _fulfilledRequests(fulfilledRequests), _incomingRequests("ShortHistoryHandler") { startThread(); } ShortHistoryHandler::~ShortHistoryHandler() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incomingRequests.newRequest(r); waitForThread(); } void ShortHistoryHandler::request(ShortHistoryRequest *request) { request->callbackId = mtNewRequest; _incomingRequests.newRequest(request); } void ShortHistoryHandler::abort(SocketInfo *socket, ShortHistoryRequest *request) { RemoveRequestRequest *r = new RemoveRequestRequest(socket); r->callbackId = mtAbortRequest; r->originalRequest = request; _incomingRequests.newRequest(r); } void ShortHistoryHandler::abort(SocketInfo *socket) { Request *r = new Request(socket); r->callbackId = mtDeleteSocket; _incomingRequests.newRequest(r); } AlertId ShortHistoryHandler::getFirstPossibleId() { static int reuseCount = 0; static AlertId cachedId = 0; if (reuseCount && cachedId) { reuseCount--; } else { /* insufficient permissions to do a drop temporary table. std::vector< std::string >sql; sql.push_back("CREATE TEMPORARY TABLE last_trading_day SELECT LEFT(MAX(timestamp), 10) AS last FROM alerts;"); sql.push_back("SELECT id FROM alerts, last_trading_day WHERE timestamp > last_trading_day.last ORDER BY timestamp LIMIT 1"); sql.push_back("DROP TEMPORARY TABLE last_trading_day"); DatabaseWithRetry::ResultList result = _database.tryAllUntilSuccess(sql.begin(), sql.end()); cachedId = result[1]->getIntegerField(0, 0); */ cachedId = 0; MysqlResultRef result1 = _database.tryQueryUntilSuccess("SELECT DATE(timestamp) AS last " "FROM alerts where FAKE='N' " "ORDER BY ID DESC LIMIT 1"); if (!result1->fieldIsEmpty(0)) { MysqlResultRef result2 = _database.tryQueryUntilSuccess ("SELECT id FROM alerts WHERE timestamp > '" + mysqlEscapeString(result1->getStringField(0)) + "' ORDER BY timestamp LIMIT 1"); cachedId = result2->getIntegerField(0, 0); } reuseCount = cachedId?25:0; } return cachedId; } void ShortHistoryHandler::threadFunction() { while (true) { while (Request *current = _incomingRequests.getRequest()) { switch (current->callbackId) { case mtNewRequest: { _inProgress.push(current); break; } case mtQuit: { delete current; return; } case mtAbortRequest: { RemoveRequestRequest *request = dynamic_cast(current); _inProgress.remove(request->getSocketInfo(), request->originalRequest); delete current; break; } case DeleteSocketThread::callbackId: case mtDeleteSocket: { _inProgress.remove(current->getSocketInfo()); delete current; break; } default: { delete current; break; } } } if (_inProgress.empty()) { _incomingRequests.waitForRequest(); } else { ShortHistoryRequest *current = dynamic_cast(_inProgress.pop()); if (current) { AlertId stopAtId = getFirstPossibleId() - 1; AlertId lastId = current->getLastId(); while (stopAtId && (lastId > stopAtId)) { AlertId startAfter = std::max(lastId - 3000, stopAtId); const std::string sql = current->getSql().getCommon(startAfter, lastId, 1); MysqlResultRef result = _database.tryQueryUntilSuccess(sql); if (result->rowIsValid()) { current->getSql().copyAlert(current->result, result, ahtShortHistory); break; } lastId = startAfter; } current->callbackId = _callbackId; _fulfilledRequests->newRequest(current); } } } }