#include #include #include #include #include #include #include #include #include "MiscSupport.h" #include "ZLibMalloc.h" #include "SimpleLogFile.h" #include "ThreadMonitor.h" #include "GlobalConfigFile.h" #include "TalkWithServer64.h" ///////////////////////////////////////////////////////////////////// // ///////////////////////////////////////////////////////////////////// enum class DumpAll {Yes, No, Unknown}; static DumpAll dumpAll = DumpAll::Unknown; static void initDumpAll() { if (dumpAll == DumpAll::Unknown) // getConfigItem: dump_all=1 means send a copy of every message in or out // getConfigItem: of the system to the log file. This can be useful in // getConfigItem: debugging, but it makes really big log files very // getConfigItem: quickly. This setting is shared by several libraries. dumpAll = (getConfigItem("dump_all") == "1")?DumpAll::Yes:DumpAll::No; } ///////////////////////////////////////////////////////////////////// // TalkWithServer64 ///////////////////////////////////////////////////////////////////// TalkWithServer64::MessageId TalkWithServer64::_lastMessageId = 0; TalkWithServer64::MessageId TalkWithServer64::getNextMessageId() { // For simplicity make this atomic. A __thread variable would probably // have worked, too. Each client probably stays in one thread. But it // was just as easy to do it this way and have fewer assumptions. return __sync_add_and_fetch(&_lastMessageId, 1); } TalkWithServer64::TalkWithServer64(std::string const &name) : _name(name), _uniqueId(getNextMessageId()), _tcpIpConnection(name), _status(NotYetOpen), _allResponsesListener(NULL) { initDumpAll(); } TalkWithServer64::~TalkWithServer64() { assert(_atServer.empty() && _localQueue.empty()); } void TalkWithServer64::connect(std::string address, std::string port, bool asyncConnect) { assert(_status == NotYetOpen); _tcpIpConnection.connect(address, port, asyncConnect); _status = Open; _tcpIpConnection.toSend = "command=set_output&mode=zlib64\r\ncommand=set_input&mode=zlib\r\n"; checkForClosed(); } void TalkWithServer64::checkForClosed() { if (_tcpIpConnection.closed()) disconnect(); } void TalkWithServer64::disconnect() { if (_status != Closed) { _status = Closed; _tcpIpConnection.close(); for (auto it = _atServer.begin(); it != _atServer.end(); it++) { LocalWrapper localWrapper; localWrapper.messageId = it->first; localWrapper.success = false; _localQueue.push_back(localWrapper); } } } bool TalkWithServer64::disconnected() { return _status == Closed; } void TalkWithServer64::cancel(MessageId id) { _atServer.erase(id); } void TalkWithServer64::cancelAll() { _atServer.clear(); } void TalkWithServer64::setAllResponsesListener(IAllResponsesListener *newListener) { _allResponsesListener = newListener; if (newListener == NULL) _allResponses.clear(); } int TalkWithServer64::pendingResponseCount() { return _localQueue.size(); } void TalkWithServer64::doResponses() { // It is possible that during this one of these callbacks, new callbacks // will be added to the list. It's is unlikely that a valid response will // appear in the list, but it is certaily possible for someone to disconnect // the session in a callback, so the list will have new error responses. // It is also possible for a request to be canceled while in a callback. // // Strange. The comment above comes from TalkWithServer.C. But that file // used iterators to walk through the list. An iterator could be invalidated // if the list grows. I wonder why I made that comment but didn't do // anything about it. for (size_t i = 0; i < _localQueue.size(); i++) { LocalWrapper const &localWrapper = _localQueue[i]; //TclList msg; //msg<second; //msg<<"clienId"<clientId // <<"streaming"<streaming; if (localWrapper.success) { serverWrapper.listener->onMessage(localWrapper.body, serverWrapper.clientId, localWrapper.messageId); if (!serverWrapper.streaming) // Clean up. Without this we would have a memory leak. _atServer.erase(it); } else { serverWrapper.listener->onAbort(serverWrapper.clientId, localWrapper.messageId); _atServer.erase(it); } } //sendToLogFile(msg); } _localQueue.clear(); if (_allResponsesListener) { // It seems unlikely that this list would change during the callbacks. // But it's possible. I could list out the conditions involved -- things // to avoid in a callback -- but as long as we're discussing it, let's just // make this bulletproof. std::vector< AllResponsesWrapper > toReport; _allResponses.swap(toReport); for (auto it = toReport.begin(); it != toReport.end(); it++) _allResponsesListener->onResponse(it->body, it->messageId); } else // No listener _allResponses.clear(); } void TalkWithServer64::sendMessage(Message const &message) { if (!disconnected()) _outgoing.push_back(message); } void TalkWithServer64::sendMessageWithId(Message message, IMessageListener *listener, int64_t clientId, MessageId messageId, bool streaming) { assert(listener); assert(!_atServer.count(messageId)); ServerWrapper serverWrapper; serverWrapper.listener = listener; message["message_id"] = ntoa(messageId); serverWrapper.streaming = streaming; serverWrapper.clientId = clientId; _atServer[messageId] = serverWrapper; if (disconnected()) { // Fail immediately. LocalWrapper localWrapper; localWrapper.messageId = messageId; localWrapper.success = false; _localQueue.push_back(localWrapper); } else _outgoing.push_back(message); } TalkWithServer64::MessageId TalkWithServer64::sendMessage(Message message, IMessageListener *listener, int64_t clientId, bool streaming) { const MessageId messageId = getNextMessageId(); sendMessageWithId(message, listener, clientId, messageId, streaming); return messageId; } bool TalkWithServer64::wantsRead() { return _tcpIpConnection.wantsRead(); } bool TalkWithServer64::wantsWrite() { return // The TalkWithServer64 has something in its outgoing buffer and the // TcpIpConnection is ready to accept it. (_tcpIpConnection.readyToSend() &&!_outgoing.empty()) // Or the TcpIpConnection has something in its outgoing buffer. || _tcpIpConnection.wantsWrite(); } int TalkWithServer64::getHandle() { return _tcpIpConnection.getHandle(); } void TalkWithServer64::addStringWithSize(std::string toCompress) { addInt32(toCompress.size()); _zLibOutputStream.add(toCompress); } void TalkWithServer64::addInt32(int32_t toCompress) { _zLibOutputStream.add(&toCompress, 4); } static const std::string s_marshal = "marshal"; static const std::string s_tcp_ip = "TCP/IP"; static const std::string s_unmarshal = "unmarshal"; void TalkWithServer64::wakeUp() { TclList msg; if (dumpAll == DumpAll::Yes) msg<::const_iterator messageIt = _outgoing.begin(); messageIt != _outgoing.end(); messageIt++) { for (Message::const_iterator fieldIt = messageIt->begin(); fieldIt != messageIt->end(); fieldIt++) { addStringWithSize(fieldIt->first); addStringWithSize(fieldIt->second); } addInt32(0); } if (dumpAll == DumpAll::Yes) { for (std::vector< Message >::const_iterator messageIt = _outgoing.begin(); messageIt != _outgoing.end(); messageIt++) { TclList msg; msg<begin(); fieldIt != messageIt->end(); fieldIt++) msg<first<second; sendToLogFile(msg); } } _outgoing.clear(); _zLibOutputStream.flush(); if (dumpAll == DumpAll::Yes) msg<<"previous outgoing buffer size"<<_tcpIpConnection.toSend.length() <<"additional outgoing bytes"<<_zLibOutputStream.output.length(); _tcpIpConnection.toSend += _zLibOutputStream.output; _zLibOutputStream.output.clear(); } if (!_zLibOutputStream.valid()) { // This should never happen! Maybe out of memory? Or a code error on // our part. TclList msg; msg<