#include "../shared/LogFile.h" #include "../shared/ThreadMonitor.h" #include "../shared/XmlReader.h" #include "AlertConnection.h" ///////////////////////////////////////////////////////////////////// // AlertConnection::SingleRequest ///////////////////////////////////////////////////////////////////// AlertConnection::SingleRequest::SingleRequest(AlertConnection *manager, SocketInfo *socketInfo, int64_t id, bool saveToMru, std::string const &collaborate, Callback callback) : _manager(manager), _socketInfo(socketInfo), _id(id), _saveToMru(saveToMru), _callback(callback) { _metaData.collaborate = collaborate; } void AlertConnection::SingleRequest::sendConfig() { LogFile::primary().sendString(TclList()<sendMessage(message, _manager, _id, false); } AlertConnection::SingleRequest::~SingleRequest() { Message message; message["command"] = "ms_alert_stop"; message["strategy_id"] = _id; _manager->sendMessage(message); } void AlertConnection::SingleRequest::onMessage(std::string const &bytes) { // Metadata response TclList msg; msg<(&workingCopy[0]); msg<<"parsed successfully"; RapidXmlElement top = RapidXmlElement(doc)["STATUS"]; _metaData.collaborate = top.attribute("SHORT_FORM", _metaData.collaborate); _metaData.windowName = top.attribute("WINDOW_NAME", _metaData.windowName); // Read the columns. // Interesting. The collaborate and windowName properties stay the same // if there's an error. I made the columns work the same way. if (RapidXmlElement columns = top["COLUMNS"]) { _metaData.columns.clear(); for (RapidXmlElement column : columns) { PropertyList columnDescription = column.allAttributes(); columnDescription[""] = column.name(); _metaData.columns.emplace_back(columnDescription); } } msg<<"_metaData.collaborate"<<_metaData.collaborate <<"_metaData.windowName"<<_metaData.windowName <<"_metaData.columns"<<_metaData.columns; } catch (rapidxml::parse_error &ex) { // ex.where() is a char *, not a std::string. We have to define // workingcopy outside of the try block or that pointer would be useless // here. msg<<"PARSE ERROR"<(); } LogFile::primary().sendString(msg, _socketInfo); // Notify the listener that we have new metadata, with no alerts. sendResponse(Alerts()); } std::string AlertConnection::SingleRequest::debugDump(SingleRequest* request) { if (!request) return "NULL"; TclList result; result<<"_socketInfo"<_socketInfo) <<"_id"<_id <<"_metaData.windowName"<_metaData.windowName; return result; } ///////////////////////////////////////////////////////////////////// // AlertConnection ///////////////////////////////////////////////////////////////////// // Each alert window will have it's own id starting from here. uint64_t AlertConnection::_lastId = LAST_RESERVED_ID; void AlertConnection::checkListenStatus(SingleRequest *newRequest) { LogFile::primary().sendString(TclList()<sendConfig(); } else if (_allRequests.empty()) { // No work. No reason to connect. assert(!newRequest); // This should already be stored in _allRequests. return; } else { // Connect. _listenerActive = true; checkConnection(); // Send the one time setup. Message message; message["command"] = "ms_alert_listen"; if (_nextAlertId) message["next_id"] = ntoa(_nextAlertId); sendMessage(message, this, ALERT_DATA, true); // Send each active request. for (auto &KVP: _allRequests) KVP.second->sendConfig(); } } void AlertConnection::onLoginResponse(std::string const &bytes) { TclList msg; msg<(&workingCopy[0]); RapidXmlElement top = RapidXmlElement(doc)["DATA"]; for (RapidXmlElement strategy: top) { if (SingleRequest *request = getPropertyDefault(_allRequests, strategy.attribute("ID", -1))) { Alerts alerts; for (RapidXmlElement alert: strategy) alerts.emplace_back(alert.allAttributes()); tm.increment("strategy parsed"); tm.increment("alert parsed", alerts.size()); request->sendResponse(alerts); } else // Not necessarily a problem. Possibly we canceled a strategy, but // the server was sending a result for that strategy, and the two // messages passed each other. tm.increment("strategy not found"); } } catch (rapidxml::parse_error &ex) { tm.increment("onAlertData parse error"); } // Sample of what we see: } void AlertConnection::onAbort(int64_t clientId, MessageId messageId) { if (clientId <= 0) // Not ours. ServerConnection64::onAbort(clientId, messageId); else if (clientId == ALERT_DATA) { // We have been disconnected. _listenerActive = false; // Reconnect if there's a reason to do so. checkListenStatus(NULL); } // else ignore it. We don't worry about failures for the individual alerts. // If the main connection fails, we redo everything. If anything else fails, // we assume we'll cover it when we reconnect. } void AlertConnection::onMessage(std::string bytes, int64_t clientId, MessageId messageId) { if (clientId <= 0) ServerConnection64::onMessage(bytes, clientId, messageId); else if (clientId == ALERT_DATA) onAlertData(bytes); else if (clientId == LOGIN_RESPONSE) onLoginResponse(bytes); else if (SingleRequest *singleRequest = getPropertyDefault(_allRequests, clientId)) singleRequest->onMessage(bytes); } void AlertConnection::socketClosed(SocketInfo *socket) { assert(socket); // We only store items if the socket is not null. auto bySocketIt = _bySocket.find(socket); if (bySocketIt == _bySocket.end()) // Optimization. We've never seen this socket before. return; std::set< int64_t > &set = bySocketIt->second; for (int64_t requestId : set) { // Find this SingleRequest object in _allRequests. auto allRequestsIt = _allRequests.find(requestId); // Assert that our two data structures are in sync assert(allRequestsIt != _allRequests.end()); // Delete this SingleRequest object. delete allRequestsIt->second; // Remove this SingleRequest object from index by id. _allRequests.erase(allRequestsIt); } // Remove these objects from _bySocket all at once. _bySocket.erase(bySocketIt); } void AlertConnection::listen(int64_t id, std::string const &collaborate, bool saveToMru, SocketInfo *socket, Callback callback) { // We generate the ID numbers ourselves. They should be unique. assert(!_allRequests.count(id)); SingleRequest *request = new SingleRequest(this, socket, id, saveToMru, collaborate, callback); _allRequests[id] = request; if (socket) _bySocket[socket].insert(id); checkListenStatus(request); } int64_t AlertConnection::listen(std::string collaborate, bool saveToMru, SocketInfo *socket, Callback callback) { int64_t id = __sync_add_and_fetch(&_lastId, 1); invokeIfRequired([=]() { listen(id, collaborate, saveToMru, socket, callback); }); return id; } void AlertConnection::cancel(int64_t id) { cancel(id, [](SocketInfo *) { }); } void AlertConnection::cancel(int64_t id, std::function< void(SocketInfo *) > action) { invokeIfRequired([=]() { auto allRequestsIt = _allRequests.find(id); if (allRequestsIt == _allRequests.end()) // Not found. This is not a bug. You are explicitly allowed to call // cancel more than once. return; SingleRequest *request = allRequestsIt->second; SocketInfo *const socketInfo = request->getSocketInfo(); if (socketInfo) _bySocket[socketInfo].erase(id); _allRequests.erase(allRequestsIt); delete request; action(socketInfo); // It's tempting to remove _bySocket[socketInfo] if that set is empty. // There's no real need. This is a small, fixed per socket cost. It // will be removed when the user disconnects. We already have a dead // man timer getting rid of users who are idle. }); } void AlertConnection::onNewConnection(TalkWithServer64 *connection) { switch (_loginType) { case ltProxy: { Message message; message["command"] = "login"; message["username"] = _username; message["password"] = _password; sendMessage(message, this, LOGIN_RESPONSE, true); break; } case ltDirect: // TODO break; default: // Do nothing. It's tempting to throw an exception here. If the // value is ltNone we probably shouldn't get here. Any other value // is not a legal value for LoginType. break; } } void AlertConnection::loginProxy(std::string username, std::string password) { // Warning. Proxy mode doesn't work as well as it should. At this time // ServerConnection64 doesn't know the special status messages sent by the // proxy. This should work okay most of the time. If the alert server is // restarted but the proxy is not, we probably won't get the message. // // We like proxy mode. For one thing it makes it easy for someone to test // when they are running outside of the firewall. Also, the proxy is // responsible for redundancy. If you point directly to an alert server, and // that server goes down, this software isn't smart enough to go to a // different server. invokeIfRequired([=]() { _loginType = ltProxy; _username = username; _password = password; reset(); }); } void AlertConnection::loginDirect(int userId) { invokeIfRequired([=]() { _loginType = ltDirect; _userId = userId; reset(); }); } void AlertConnection::logOut() { invokeIfRequired([=]() { _loginType = ltNone; reset(); }); } IContainerThread *AlertConnection::defaultThread() { static IContainerThread *thread = IContainerThread::create("AlertConnection.C"); return thread; } AlertConnection::AlertConnection(IContainerThread *thread) : ServerConnection64("alert_server", thread?thread:defaultThread()), _listenerActive(false), _loginType(ltNone), _nextAlertId(0) { // getConfigItem: alert_server=www.trade-ideas.com:8844 // getConfigItem: alert_server is the host and port name of the server we // getConfigItem: need to connect to. There is no default value. // getConfigItem: Don't forget that AlertConnection offers two different // getConfigItem: ways to connect. So alert_server might point to a // getConfigItem: micro_proxy or it might point directly to a // getConfigItem: fast_alert_search server. getAndParseAddress("alert_server"); start(); } bool AlertConnection::shouldTryToConnect() { // This gets called a lot. /* LogFile::primary().sendString(TclList()<