#include #include "../../shared/LogFile.h" #include "../../shared/SelectableRequestQueue.h" #include "../../shared/ThreadClass.h" #include "../../shared/IPollSet.h" #include "../../shared/ThreadMonitor.h" #include "../../shared/SmarterP.h" #include "../../shared/TalkWithServer64.h" #include "../../shared/Marshal.h" #include "ServerList.h" #include "RemoteHistoryResponse.h" #include "RemoteHistoryConnection.h" /* This class is responsible for talking with the historical alert servers. * * We plan to have a cluster of these. Like the database servers, there will * be several machines which are all similar, but each one is responsible for * accessing different data. When you hand a request to this object it will * select the appropriate server and forward the message to that server. * * This class is different from a lot of our client connections because most * of the logic for the data is in other classes. Usually (e.g. * ../ax_alert_server/TopListServerConnection.h) The class that inherits from * ServerConnection will know when and how to build messages. For example, * the TopListServerConnection has a list of outstanding requests. When the * connection is broken and restarted, that class automatically creates and * sends the appropriate messages. RemoteHistoryConnection is responsible * for lots of different types of messages, e.g. Alert History, OddsMaker, * Top List History. Other classes are responsible for creating these messages * and understanding the responses. In this way the RemoteHistoryConnection * class works more like the ConnectionMaster object in TI Pro; it only knows * about a few messages, like ping. */ class RemoteHistoryConnection : private ThreadClass { private: class PollReturn { public: virtual void onPollReturn() =0; }; enum { mtDoInThread }; class DoInThread : public Request { public: const std::function< void() > toDo; DoInThread(SocketInfo *socket, std::function< void() > const &toDo) : Request(socket), toDo(toDo) { callbackId = mtDoInThread; } }; SelectableRequestQueue _incoming; ServerList::Ref _serverList; IPollSet _pollSet; std::unordered_map< int, PollReturn * > _handleToAction; void tryNewServerList(ServerList::Ref const &serverList); class Connection : private TalkWithServer64::IMessageListener { public: typedef TalkWithServer64::MessageId MessageId; typedef TalkWithServer64::Message Message; SocketInfo *const _socket; const std::string _hostName; const std::string _port; std::unordered_map< MessageId, RemoteHistoryRequest::Ref > _outstanding; private: TalkWithServer64 _talkWithServer; // For IMessageListener virtual void onMessage(std::string bytes, int64_t clientId, MessageId messageId); // For IMessageListener virtual void onAbort(int64_t clientId, MessageId messageId); public: Connection(SocketInfo *socket, std::string const &hostName, std::string const &port); void send(RemoteHistoryRequest::Ref const &request, uint64_t fileId); bool cancel(RemoteHistoryRequest::Ref const &request); }; std::unordered_map< SocketInfo *, std::map< std::pair< std::string, std::string >, Connection * > > _connections; Connection *find(SocketInfo *socket, std::string const &hostName, std::string const &port); // When a Connection is ready to vanish it calls // RemoteHistoryConnection::erase(). That will immediately remove the // Connection from _connections. It will not immediately delete the // Connection. We are being called from that object, so we don't want to // delete it. Instead, store it so we can delete it at a more appropriate // time. std::vector< Connection * > _deleteSoon; void erase(Connection *toErase); std::unordered_map< SocketInfo *, std::vector< RemoteHistoryRequest::Ref > > _deferred; virtual void threadFunction(); void submitImpl(RemoteHistoryRequest::Ref const &request); void doInThread(SocketInfo *socket, std::function< void() > const &toDo); ~RemoteHistoryConnection() { assert(false); } public: RemoteHistoryConnection(); void submit(RemoteHistoryRequest::Ref const &request); void abortAndRetry(RemoteHistoryRequest::Ref const &request); }; static RemoteHistoryConnection *remoteHistoryConnection = NULL; ///////////////////////////////////////////////////////////////////// // RemoteHistoryRequest ///////////////////////////////////////////////////////////////////// void RemoteHistoryRequest::retryAfterAbort() { if (_connected) sendClose(EventType::LocalAbort); else pushEvent(EventType::LocalAbort); } void RemoteHistoryRequest::abortAndRetry(Ref const &request) { assert(this == request.get()); // Mark this request so we know that we are in the process of aborting. // All events should be ignored until a EventType::LocalAbort message // tells us that the abort was successful. This deals with messages that // were sent while we were trying to abort, possibly in a different thread. OnResponse previous = __sync_val_compare_and_swap(&_onResponse, /* if it was */ OnResponse::Normal, /* change it to */ OnResponse::DrainAndResend); // If you call abortAndRetry() you shouldn't call abortAndRetry() again until // you receive EventType::LocalAbort. assert(previous != OnResponse::DrainAndResend); if (previous == OnResponse::Normal) remoteHistoryConnection->abortAndRetry(request); } std::vector< RemoteHistoryRequest::Event > RemoteHistoryRequest::getEvents() { std::vector< Event > result; if (_onResponse == OnResponse::Abort) return result; _events.takeFIFO([&](Event const &&event) { bool keep = false; if (_onResponse == OnResponse::DrainAndResend) { if (event.reason == EventType::LocalAbort) { OnResponse previous = __sync_val_compare_and_swap (&_onResponse, /* if it was */ OnResponse::DrainAndResend, /* change it to */ OnResponse::Normal); assert(previous != OnResponse::Normal); keep = true; } } else { assert(event.reason != EventType::LocalAbort); keep = true; } if (keep) result.emplace_back(std::move(event)); }); return result; } void RemoteHistoryRequest::submit(Ref const &request) { assertTrue(__sync_bool_compare_and_swap(&request->_connected, false, true)); static RemoteHistoryConnection *connection = new RemoteHistoryConnection(); connection->submit(request); } void RemoteHistoryRequest::sendClose(EventType reason) { assert(reason != EventType::Data); assertTrue(__sync_bool_compare_and_swap(&_connected, true, false)); pushEvent(reason); } ///////////////////////////////////////////////////////////////////// // RemoteHistoryConnection::Connection ///////////////////////////////////////////////////////////////////// // TODO bytes might be big. When I wrote this interface strings used // copy on write so it wasn't a problem that we were making a copy of // the string object. I wonder how much trouble (or good) this would // cause for other projects if I changed this. Maybe pass it by // reference or by move or using a reference counted smart pointer. void RemoteHistoryConnection::Connection::onMessage(std::string bytes, int64_t clientId, MessageId messageId) { const auto requestIt = _outstanding.find(messageId); if (requestIt == _outstanding.end()) // It may be possible to get a message after we've lost interest. We // should be canceling every request. But I think a small number of // messges might still arrive sometimes. return; const RemoteHistoryRequest::Ref request = requestIt->second; bool error; bool cancelMessage = false; try { // RemoteHistoryResponse.h contains the inverse of this code. The server // will use RemoteHistoryResponse.h to encode the data that mPop is // decoding here. RemoteHistoryResponse::Type type = unmarshal< RemoteHistoryResponse::Type >(mPop(bytes)); switch (type) { case RemoteHistoryResponse::Type::Debug: { TclList msg; msg<socket); error = false; break; } case RemoteHistoryResponse::Type::Close: _talkWithServer.cancel(messageId); request->sendClose(RemoteHistoryRequest::EventType::RemoteGracefulClose); error = false; cancelMessage = true; break; case RemoteHistoryResponse::Type::Data: request->pushEvent(SmarterP< std::string >(NULL, std::move(bytes))); error = false; break; case RemoteHistoryResponse::Type::FileNotFound: { ServerList::Ref serverList = ServerList::unmarshal(bytes); if (serverList->isNewerThan(remoteHistoryConnection->_serverList)) { // We're already in the right thread. But let's try to install this // when we're not in the middle of something else. remoteHistoryConnection->doInThread(request->socket, [serverList]() { remoteHistoryConnection->tryNewServerList(serverList); }); error = true; } else { error = false; // We're currently talking to a server that's working from an // old ServerList. Ignore this connection. (TODO report this and // much more to ThreadMonitor.) We could report this error to the // caller right now, but it would be pointless. If he immediately // retried, the same thing would happen. (That would not be true if // there was redundancy in the ServerList, but that's explicitly not // planned.) Instead, leave the message in the active state. We // assume that server will soon be restarted with the newest // ServerList. At that time we'll automatically send // onUnexpectedClose() and the caller will retry. } break; } default: error = true; break; } } catch (MarshallingException const &ex) { error = true; } if (error) { request->sendClose(RemoteHistoryRequest::EventType::UnexpectedClose); cancelMessage = true; } if (cancelMessage) { _talkWithServer.cancel(messageId); _outstanding.erase(requestIt); } } void RemoteHistoryConnection::Connection::onAbort(int64_t clientId, MessageId messageId) { const auto requestIt = _outstanding.find(messageId); if (requestIt == _outstanding.end()) // It may be possible to get a message after we've lost interest. We // should be canceling every request. But I think a small number of // messges might still arrive sometimes. return; const RemoteHistoryRequest::Ref request = requestIt->second; _talkWithServer.cancel(messageId); _outstanding.erase(requestIt); } void RemoteHistoryConnection::Connection::send (RemoteHistoryRequest::Ref const &request, uint64_t fileId) { if (_talkWithServer.notYetOpen()) _talkWithServer.connect(_hostName, _port, true); Message message = { { "command", request->command }, { "body", request->body }, { "file_id", marshal(fileId) } }; _outstanding[_talkWithServer.sendMessage(message, this, 0, true)] = request; } bool RemoteHistoryConnection::Connection::cancel (RemoteHistoryRequest::Ref const &request) { for (auto it = _outstanding.begin(); it != _outstanding.end(); it++) if (it->second == request) { // Found. _talkWithServer.cancel(it->first); _outstanding.erase(it); return true; } return false; // Not found. } RemoteHistoryConnection::Connection::Connection(SocketInfo *socket, std::string const &hostName, std::string const &port) : _socket(socket), _hostName(hostName), _port(port), _talkWithServer("RemoteHistoryConnection ➤ " + hostName + ':' + port + " for " + ntoa(SocketInfo::getSerialNumber(socket))) { } ///////////////////////////////////////////////////////////////////// // RemoteHistoryConnection ///////////////////////////////////////////////////////////////////// void RemoteHistoryConnection::tryNewServerList(ServerList::Ref const &serverList) { if (serverList->isNewerThan(_serverList)) { _serverList = serverList; std::vector< RemoteHistoryRequest::Ref > toResend; for (auto const &v : _deferred) for (RemoteHistoryRequest::Ref const &request : v.second) toResend.push_back(request); _deferred.clear(); for (RemoteHistoryRequest::Ref const &request : toResend) submitImpl(request); } } void RemoteHistoryConnection::submitImpl(RemoteHistoryRequest::Ref const &request) { if (!_serverList) _deferred[request->socket].push_back(request); else { ServerList::Entry const *const entry = _serverList->find(request->time, request->forward); if (!entry) request->sendClose(RemoteHistoryRequest::EventType::LocalEndOfData); else { Connection *const connection = find(request->socket, entry->hostName, entry->port); connection->send(request, entry->fileId()); } } } void RemoteHistoryConnection::threadFunction() { //ThreadMonitor &tm = ThreadMonitor::find(); _pollSet.addForRead(_incoming.getWaitHandle()); while (true) { // _timerQueue.doAllCallbacks(); TODO _pollSet.addForRead(_incoming.getWaitHandle()); // _pollSet.setTimeout(_timerQueue.nextTimeout()); for (Connection *toDelete : _deleteSoon) delete toDelete; _deleteSoon.clear(); _pollSet.poll(); for (int handle : _pollSet.woken()) { // Look through the list of sockets that are ready. auto it = _handleToAction.find(handle); if (it != _handleToAction.end()) it->second->onPollReturn(); } // Look for commands. _incoming.resetWaitHandle(); while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtDoInThread: { DoInThread *request = dynamic_cast< DoInThread * >(current); request->toDo(); break; } case DeleteSocketThread::callbackId: { SocketInfo *socket = current->getSocketInfo(); auto it = _connections.find(socket); if (it != _connections.end()) { auto &forSocket = it->second; for (auto const &kvp : forSocket) delete kvp.second; _connections.erase(it); } break; } } delete current; } } } void RemoteHistoryConnection::doInThread(SocketInfo *socket, std::function< void() > const &toDo) { _incoming.newRequest(new DoInThread(socket, toDo)); } void RemoteHistoryConnection::submit(RemoteHistoryRequest::Ref const &request) { doInThread(request->socket, [=](){ submitImpl(request); }); } void RemoteHistoryConnection::abortAndRetry(RemoteHistoryRequest::Ref const & request) { doInThread(request->socket, [=](){ auto it = _connections.find(request->socket); if (it != _connections.end()) { for (auto const &kvp : it->second) { if (kvp.second->cancel(request)) break; } } request->retryAfterAbort(); }); } void RemoteHistoryConnection::erase(Connection *toErase) { auto &forSocket = _connections[toErase->_socket]; auto it = forSocket.find(std::make_pair(toErase->_hostName, toErase->_port)); Connection *const connection = it->second; assert(connection == toErase); _deleteSoon.push_back(connection); forSocket.erase(it); if (forSocket.empty()) _connections.erase(toErase->_socket); } RemoteHistoryConnection::Connection * RemoteHistoryConnection::find(SocketInfo *socket, std::string const &hostName, std::string const &port) { Connection *&connection = _connections[socket][std::make_pair(hostName, port)]; if (!connection) connection = new Connection(socket, hostName, port); return connection; } RemoteHistoryConnection::RemoteHistoryConnection() : ThreadClass("RemoteHistoryConnection"), _incoming("RemoteHistoryConnection") { assert(!remoteHistoryConnection); remoteHistoryConnection = this; }