#include "ThreadMonitor.h" #include "GlobalConfigFile.h" #include "SimpleLogFile.h" #include "ServerConnection64.h" // TODO Listen for the special message from the ../micro_proxy telling us that // something disconnected!! void ServerConnection64::checkPing() { checkConnection(false); if (!shouldTryToConnect()) return; const time_t now = time(NULL); if (!_connection) { // The connection is down. if (now > _connectionBlackout) { // Try to recreate the connection. //TclList msg; //msg<disconnect(); ThreadMonitor::find().increment("ping not found"); } else { // Time to send another ping Message message; message["command"] = "ping"; message["response"] = "1"; _connection->sendMessage(message, this, CI_PING, false); _nextPingTime = time(NULL) + 5; _pingSent = true; _pleaseSendSoon = true; } } } void ServerConnection64::onMessage(std::string bytes, int64_t clientId, MessageId messageId) { ThreadMonitor &tm = ThreadMonitor::find(); switch (clientId) { case CI_PING: { // Ping received. Send another one in 5 seconds. //std::cout<<"ciPing"<wakeUp(); _pleaseSendSoon = false; } if (_connection->wantsRead()) callbacks.waitForRead(_connection->getHandle()); if (_connection->wantsWrite()) callbacks.waitForWrite(_connection->getHandle()); } // Check approximately once per second for the pings. These don't have to be // very precise. callbacks.wakeAfterMs(1000); } void ServerConnection64::awake(std::set< int > const &woken) { checkPing(); if (_connection) _connection->wakeUp(); if (_connection && _connection->pendingResponseCount()) _connection->doResponses(); } void ServerConnection64::onNewConnection(TalkWithServer64 *connection) { // Our own onConnect logic is build into getConnection(). If we put it // here, the subclass might forget to call this. A subclass should be // able to add stuff, but there's no reason for it to get rid of our // stuff. } bool ServerConnection64::shouldTryToConnect() { // The default is a simple case where we always want to connect. This // is common. return true; } void ServerConnection64::checkConnection(bool createIfRequired) { if (_connection && _connection->disconnected()) { // doResponses() will send errors to anyone still waiting for a message. // Often those callbacks will send new messages. We have to make // _connection NULL before calling those callbacks or we could get into // an infinite loop. const auto oldConnection = _connection; _connection = NULL; oldConnection->doResponses(); oldConnection->cancelAll(); delete oldConnection; //std::cout<<"ServerConnection64::checkConnection("<<(createIfRequired?"createIfRequired":"don't create")<<") is deleting the old connection."<connect(_serverName, _serverPort, false); if (_connection->disconnected()) // Unable to open the connection. // Keep what we have to avoid a NULL pointer. return; // This is often a login. This should be done once for every connection. // And in the case of a login, it should be done before any other messages. onNewConnection(_connection); if (!_messagesToSend.empty()) { // Send messages that we queued up as part of the SendManager. Start by // moving all of the queued messages into a different variable. MessagesToSend previouslyHeld; previouslyHeld.swap(_messagesToSend); for (auto it = previouslyHeld.begin(); it != previouslyHeld.end(); it++) { MessageToSend const &toSend = it->second; if (toSend.listener) _connection->sendMessageWithId(toSend.message, toSend.listener, toSend.clientId, toSend.messageId, toSend.streaming); else _connection->sendMessage(toSend.message); } _pleaseSendSoon = true; } // Request a new ping ASAP. _nextPingTime = 0; _pingSent = false; } ServerConnection64::ServerConnection64(std::string const &name, IContainerThread *thread) : ForeverThreadUser(thread), _connection(NULL), _connectionBlackout(0), _pleaseSendSoon(false), _name(name) { } void ServerConnection64::reset() { if (_connection) _connection->disconnect(); } bool ServerConnection64::getAndParseAddress(std::string const &configName, std::string const &defaultValue) { const std::string input = getConfigItem(configName, defaultValue); const std::vector< std::string > pieces = explode(":", input); const bool success = pieces.size() == 2; TclList msg; msg<disconnected()) // Don't try to send messages to the _connection. It will immediately // (or almost immediately) call the callback to say there was a // failure. Which in many cases will automatically try to send another // message to make up for the failed one. Queue up the messages until // the timer reconnects us. return true; else // All is good. No need to defer. Just send the message as requested. return false; } // Wait for the timer to create the connection. return true; } void ServerConnection64::sendMessage(Message const &message) { if (shouldDeferMessages()) _messagesToSend[TalkWithServer64::getNextMessageId()] = message; else { _connection->sendMessage(message); _pleaseSendSoon = true; } } ServerConnection64::MessageId ServerConnection64::sendMessage(Message const &message, IMessageListener *listener, int64_t clientId, bool streaming) { if (shouldDeferMessages()) { assert(listener); MessageToSend messageToSend; messageToSend.message = message; messageToSend.listener = listener; messageToSend.clientId = clientId; messageToSend.streaming = streaming; messageToSend.messageId = TalkWithServer64::getNextMessageId(); _messagesToSend[messageToSend.messageId] = messageToSend; return messageToSend.messageId; } else { _pleaseSendSoon = true; return _connection->sendMessage(message, listener, clientId, streaming); } } void ServerConnection64::cancelMessage(MessageId messageId) { if (_connection) _connection->cancel(messageId); _messagesToSend.erase(messageId); }