#include "../../shared/ContainerThread.h" #include "../../shared/NewWorkerCluster.h" #include "../../shared/CommandDispatcher.h" #include "../../shared/ReplyToClient.h" #include "../../shared/GlobalConfigFile.h" #include "../../shared/DatabaseForThread.h" #include "../../oddsmaker/UserInfo.h" #include "../Strategy.h" #include "RemoteHistoryResponse.h" #include "../../shared/Marshal.h" #include "MarshalValuesByName.h" #include "HistoryDirectory.h" class HistoryCommands : public ForeverThreadUser { private: enum { mtServerList, mtAlerts, mtOddsMaker, mtTopList }; NewWorkerCluster _workerCluster; void startAlertsHistory(SocketInfo *socket, std::string const &body, ExternalRequest::MessageId responseId, HistoryFileReader &reader) { class Work { public: typedef SmarterP< Work > Ref; private: NewWorkerCluster &_cluster; SocketInfo *const _socket; const ExternalRequest::MessageId _responseId; HistoryFileIterator _source; int _countRemaining; time_t _endTime; std::string _collaborate; Parse::AlertStrategy _strategy; bool _done; public: Work(NewWorkerCluster &cluster, SocketInfo *socket, ExternalRequest::MessageId responseId, HistoryFileReader &reader, time_t startTime, int64_t id, int countRemaining, time_t endTime, std::string &&collaborate) : _cluster(cluster), _socket(socket), _responseId(responseId), _source(reader, false, startTime /* , id TODO */), _countRemaining(countRemaining), _endTime(endTime), _collaborate(std::move(collaborate)), _done(false) { } static void inMainThread(Work::Ref const &work) { if (work->_done) addToOutputQueue(work->_socket, RemoteHistoryResponse::gracefulClose(), work->_responseId); else work->_cluster.addJob2([=]() { work->inWorkerThread(); }, [=]() { inMainThread(work); }, work->_socket); } void inWorkerThread() { int64_t yieldTime = getMicroTime() + 100000; if (!_strategy.isRunnable()) { _strategy.load(_collaborate, userInfoGetInfo(_socket).userId, *DatabaseForThread(DatabaseWithRetry::SLAVE)); _collaborate.clear(); } std::vector< Parse::ValuesByName > rowsFound; Record::Ref lastFound; while (true) { if (_countRemaining <= 0) { // We've done all the client has asked for. There is nothing else // to tell the client. He already knows that he got all the // records he's asked for. And he already knows the time / // restart key of the last item. _done = true; break; } Record::Ref record = _source.getNext(); if (!record) { // End of this file. // TODO } Execution::RecordInfo recordInfo(record); _strategy.init(recordInfo); if (_strategy.evaluateWhere(recordInfo)) { _countRemaining--; rowsFound.emplace_back(_strategy.evaluateColumns(recordInfo)); lastFound = record; } if (yieldTime < getMicroTime()) break; } std::string response; marshal(response, rowsFound); //TODO } }; try { size_t offset = 0; time_t startTime; unmarshal(body, offset, startTime); std::string restartKey; unmarshal(body, offset, restartKey); int requestCount; unmarshal(body, offset, requestCount); time_t endTime; unmarshal(body, offset, endTime); std::string collaborate; unmarshal(body, offset, collaborate); unmarshalFinishedOrThrow(body, offset); uint64_t alertId; try { unmarshal(restartKey, alertId); } catch (MarshallingException &) { alertId = 0x7fffffffffffffff; } Work::Ref work; Work::inMainThread(work); } catch (MarshallingException &) { // TODO Send a close. Maybe first set the restart time to 0 so there's // no retry. } } virtual void handleRequestInThread(Request *original) { ExternalRequest *request = dynamic_cast< ExternalRequest * >(original); SocketInfo *socket = request->getSocketInfo(); ExternalRequest::MessageId responseId = request->getResponseMessageId(); HistoryFileReader *reader; try { ServerList::Entry::ID fileId; unmarshal(request->getProperty("file_id"), fileId); reader = HistoryDirectory::instance().findReader(fileId); } catch (MarshallingException &) { } if (!reader) { addToOutputQueue(socket, RemoteHistoryResponse::fileNotFound(), responseId); addToOutputQueue(socket, RemoteHistoryResponse::gracefulClose(), responseId); return; } switch (request->callbackId) { case mtServerList: // This shouldn't happen. Log it? TODO break; case mtAlerts: startAlertsHistory(socket, request->getProperty("body"), responseId, *reader); break; case mtOddsMaker: break; case mtTopList: break; default: assert(false); break; } } public: HistoryCommands() : _workerCluster(IContainerThread::create("worker cluster"), "worker cluster") { const int threadCount = strtolDefault(getConfigItem("main_thread_count"), 4); _workerCluster.createWorkers(threadCount); CommandDispatcher *const cd = CommandDispatcher::getInstance(); cd->listenForCommand("history_server_list", this, mtServerList); cd->listenForCommand("history_alerts", this, mtAlerts); cd->listenForCommand("history_oddsmaker", this, mtOddsMaker); cd->listenForCommand("history_top_list", this, mtTopList); start(); } };