#include #include #include #include #include #include "PipeConditionVar.h" #include "CommandDispatcher.h" #include "SocketInfo.h" #include "SelectableRequestQueue.h" #include "LogFile.h" #include "ZLibMalloc.h" #include "PollSet.h" #include "ThreadMonitor.h" #include "ZLibStats.h" #include "GlobalConfigFile.h" #include "ReplyToClient.h" /* This is often set to something faster for messages between our own machines. * Check the logs for information on how well compression works and what it costs. */ static int zlibCompressionLevel = Z_DEFAULT_COMPRESSION; /* All requests coming to this thread will have one of these types associated * with it. In fact, that's common throughout our C++ code. Search for "mt", * case sensitive, to see what inputs come from other threads, and where we * start the process of handling each request. */ enum MessageTypes { mtSetOutputType, /* A request directly from the client. */ mtSendOutputToClient, /* A request from addToOutputQueue(). */ mtCloseOnEmpty, /* A request from closeWhenOutputQueueIsEmpty(). */ mtQuit /* A request to shut down this thread gracefully. * Originally I created one of these for most threads, * however, this server can't really do a graceful * so most new threads assert(false) in their * destructors.*/ }; // Base class for implementing addToOutputQueue(). class NewMessage : public Request { private: const ExternalRequest::MessageId _id; protected: NewMessage(SocketInfo *socketInfo, ExternalRequest::MessageId id) : Request(socketInfo), _id(id) { callbackId = mtSendOutputToClient; } public: virtual const std::string &getBody() const =0; int64_t getId() { return _id.getValue(); } }; // For addToOutputQueue(). This makes a copy of the data or moves // the data. The data is typically in a local variable in the caller, // and that variable will go away before this thread is ready to // process the request. class NewMessageCopy : public NewMessage { private: const std::string _body; public: NewMessageCopy(SocketInfo *socketInfo, std::string const &body, ExternalRequest::MessageId id) : NewMessage(socketInfo, id), _body(body) { } NewMessageCopy(SocketInfo *socketInfo, std::string &&body, ExternalRequest::MessageId id) : NewMessage(socketInfo, id), _body(std::move(body)) { } virtual const std::string &getBody() const override { return _body; } }; // For addToOutputQueue(). This uses a smart pointer to a // read only std::string to avoid copying the data. Remember // that older versions of C++ used copy on write for strings, // so this wasn't necessary in the old code. class NewMessageShare : public NewMessage { private: const SmarterCP< std::string > _body; public: NewMessageShare(SocketInfo *socketInfo, SmarterCP< std::string > const &body, ExternalRequest::MessageId id) : NewMessage(socketInfo, id), _body(body) { } virtual const std::string &getBody() const override { return *_body; } }; // A NewMessageQueue is always owned by a MessageQueue. typedef std::queue< NewMessage * > NewMessageQueue; // Each socket gets a Serializer to marshal its output. // This is an abstract base class. class Serializer : public FixedMalloc { public: /* Read from messages. Append all output to outgoing. Remove each * message from the queue after it has been processed. Stop after * the outgoing passes bufferSize or outgoing is more than bufferSize * bytes. bufferSize is probably not required. We're just moving * the memory from one buffer to another, so we're not saving anything. */ virtual void convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize) = 0; virtual bool empty() { // Return false if we are storing any data that could be flushed. return true; } /** * Valid should always be true. A serializer should never fail. But * zlib has ways to report an error. (It could only catch internal * errors, like forgetting to initialize something.) The code that * calls zlib and the code that knows how to report an error are far * from each other, so we use valid() to pass the error on. Marking * this as invalid will cause the server to close the corresponding * socket. That's how we handle a lot of unexpected errors, as long * as they seem to be related to one specific connection. */ virtual bool valid() { return true; } virtual ~Serializer() { } }; // Mostly for debugging, but also used by some simple scripts // and other tools. class TextSerializer : public Serializer { public: void convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize); }; void TextSerializer::convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize) { while ((outgoing.size() < bufferSize) && !messages.empty()) { NewMessage *current = messages.front(); outgoing += "== MESSAGE "; outgoing += ntoa(current->getId()); outgoing += " ========== "; outgoing += itoa(current->getBody().size()); outgoing += " ==\r\n"; outgoing += current->getBody(); outgoing += "\r\n"; delete current; messages.pop(); } } class ZLibSerializer : public Serializer { public: enum Mode { MNormal, /* This is the traditional way, used by TI Pro and others. */ MFraming, /* This was something special we did for E*TRADE. We had to * add an extra field for error checking. That was looking * for one specific bug which was fixed long ago. I don't * know if E*TRADE still uses that version of the code. */ M64Bit /* This is similar to MNormal but the message id field is 64 * bits instead of 32. */ }; private: class ZLibError {}; bool _valid; const Mode _mode; int _serial; // Instead of directly using the low level zlib stuff, consider // using a ZLibOutputStream. That's a wrapper around zlib that // makes things easier. z_stream_s _zlibBuffer; void compress(std::string &destination, const void *source, int count, bool flush); public: ZLibSerializer(Mode mode); void convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize); bool valid() { return _valid; } ~ZLibSerializer(); }; ZLibSerializer::ZLibSerializer(Mode mode) : _mode(mode), _serial(0) { _zlibBuffer.zalloc = zlibMalloc; //Z_NULL; _zlibBuffer.zfree = zlibFree; //Z_NULL; _zlibBuffer.opaque = Z_NULL; int result = deflateInit(&_zlibBuffer, zlibCompressionLevel); _valid = result == Z_OK; } ZLibSerializer::~ZLibSerializer() { deflateEnd(&_zlibBuffer); } void ZLibSerializer::compress(std::string &destination, const void *source, int count, bool flush) { ZLibStats::beforeCompression(count); const int outputBufferSize = 10000; char output[outputBufferSize]; int flushType = flush?Z_SYNC_FLUSH:Z_NO_FLUSH; int64_t realOutputCount = 0; bool outputBufferFull = false; // Why isn't this a constant pointer? This cast must be right. _zlibBuffer.next_in = (Bytef *)source; _zlibBuffer.avail_in = count; _zlibBuffer.total_in = 0; while (_zlibBuffer.avail_in || outputBufferFull) { _zlibBuffer.next_out = (Bytef *)output; _zlibBuffer.avail_out = outputBufferSize; _zlibBuffer.total_out = 0; int result = deflate(&_zlibBuffer, flushType); if (result != Z_OK) { throw ZLibError(); } outputBufferFull = !_zlibBuffer.avail_out; if (_zlibBuffer.total_out) { destination.append(output, _zlibBuffer.total_out); realOutputCount += _zlibBuffer.total_out; _zlibBuffer.total_out = 0; } } ZLibStats::afterCompression(realOutputCount); } void ZLibSerializer::convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize) { try { /* class RecycleBin { private: NewMessage *_current; public: RecycleBin() : _current(NULL) { } ~RecycleBin() { delete _current; } void recycle(NewMessage *current) { delete _current; _current = current; } } bin; */ class NewMessageHolder { private: NewMessage *_ptr; public: NewMessageHolder(NewMessage *ptr = NULL) : _ptr(ptr) { } ~NewMessageHolder() { delete _ptr; } NewMessage *operator ->() { return _ptr; } }; while ((!messages.empty()) && (outgoing.size() < bufferSize)) { NewMessageHolder current = messages.front(); messages.pop(); std::string const &body = current->getBody(); if (_mode == MFraming) { struct { int messageId; int messageLength; int serial; } header; header.messageId = current->getId(); header.messageLength = body.size(); header.serial = _serial; unsigned char end = ~(unsigned char)_serial; _serial++; compress(outgoing, &header, sizeof(header), false); if (header.messageLength) { compress(outgoing, body.data(), header.messageLength, false); } compress(outgoing, &end, 1, messages.empty()); } else if (_mode == MNormal) { struct { int messageId; int messageLength; } header; header.messageId = current->getId(); header.messageLength = body.size(); if (body.size()) { compress(outgoing, &header, sizeof(header), false); compress(outgoing, body.data(), body.size(), messages.empty()); } else { // zlib doesn't let us flush the buffer unless you send new bytes // at the same time. In the Delphi version of the compression // code hides this fact. This code is less general. Because // this is the only place where we call compress(), we handle // that problem here. // // Of course, that's not true any more. I see three similar versions // of this decision right in this function. And I eventually ported // TalkWithServer to C++. That code includes ZLibOutputStream, which // hides these details. Perhaps this code should use // ZLibOutputStream. compress(outgoing, &header, sizeof(header), messages.empty()); } } else // (_mode == M64Bit) { struct __attribute__ ((__packed__)) { int64_t messageId; // This is the only change from MNormal. int messageLength; } header; assert(sizeof(header)==12); header.messageId = current->getId(); header.messageLength = body.size(); if (body.size()) { compress(outgoing, &header, sizeof(header), false); compress(outgoing, body.data(), body.size(), messages.empty()); } else { // zlib doesn't let us flush the buffer unless you send new bytes // at the same time. In the Delphi version of the compression // code hides this fact. This code is less general. Because // this is the only place where we call compress(), we handle // that problem here. compress(outgoing, &header, sizeof(header), messages.empty()); } } } } catch (ZLibError) { LogFile::primary().quoteAndSend("Unexpected Error in zlib output.", _zlibBuffer.msg?_zlibBuffer.msg:"NULL"); _valid = false; } } // Used when I want want full control of the output, rather than // using one of our protocols. Only used in very specific cases. class LiteralSerializer : public Serializer { public: void convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize); }; void LiteralSerializer::convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize) { while ((outgoing.size() < bufferSize) && !messages.empty()) { NewMessage *current = messages.front(); outgoing += current->getBody(); delete current; messages.pop(); } } class SimpleSerializer : public Serializer { private: struct Header { // This is used by the old JavaScript proxy. See Simple64Serializer for // the newer code. int32_t size; int32_t messageId; }; public: SimpleSerializer(); void convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize); }; SimpleSerializer::SimpleSerializer() { ThreadMonitor::find().increment("WARNING SimpleSerializer is deprecated."); } void SimpleSerializer::convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize) { while ((outgoing.size() < bufferSize) && !messages.empty()) { NewMessage *current = messages.front(); Header header; header.size = current->getBody().size(); header.messageId = current->getId(); outgoing.append((const char *)&header, sizeof header); outgoing += current->getBody(); delete current; messages.pop(); } } class Simple64Serializer : public Serializer { // This is used by the JavaScript proxy. private: struct __attribute__((packed)) Header { int32_t size; int64_t messageId; }; static_assert(sizeof(Header) == 12, "Structure not packed as expected."); public: void convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize); }; void Simple64Serializer::convert(NewMessageQueue &messages, std::string &outgoing, unsigned bufferSize) { while ((outgoing.size() < bufferSize) && !messages.empty()) { NewMessage *current = messages.front(); Header header; header.size = current->getBody().size(); header.messageId = current->getId(); outgoing.append((const char *)&header, sizeof header); outgoing += current->getBody(); delete current; messages.pop(); } } static ReplySerializer globalDefaultSerializer; // Queues up outgoing messages. One object per socket. // This includes whole messages, work in progress, and // some configuration items. class MessageQueue { private: NewMessageQueue _messages; // _outgoing is where we store the output of the serializer before we // send it to the client. std::string _outgoing; Serializer *_serializer; SocketInfo *_socketInfo; // _closeOnEmpty is false by default but it can be set to true with // closeWhenOutputQueueIsEmpty(). bool _closeOnEmpty; // Newer code uses `= delete`. But that didn't exist in older versions // of C++. So I used the "Purposely undefined" comments instead. MessageQueue &operator =(MessageQueue &); // Purposely undefined. MessageQueue(MessageQueue &); // Purposely undefined. public: MessageQueue(SocketInfo *socketInfo, Serializer *serializer = NULL) : _serializer(serializer), _socketInfo(socketInfo), _closeOnEmpty(false) { if (!_serializer) { switch (globalDefaultSerializer) { case rtcTextSerializer: _serializer = new TextSerializer; break; case rtcZLibSerializer: _serializer = new ZLibSerializer(ZLibSerializer::MNormal); break; case rtcZLib1Serializer: _serializer = new ZLibSerializer(ZLibSerializer::MFraming); break; case rtcZLib64Serializer: _serializer = new ZLibSerializer(ZLibSerializer::M64Bit); break; case rtcLiteralSerializer: _serializer = new LiteralSerializer; break; case rtcSimpleSerializer: _serializer = new SimpleSerializer; break; case rtcSimple64Serializer: _serializer = new Simple64Serializer; break; } } if (!_serializer->valid()) { LogFile::primary().quoteAndSend("ReplyToClient.C", "Error creating serializer.", _socketInfo); DeleteSocketThread::deleteSocket(_socketInfo); } } ~MessageQueue() { if (_serializer) { delete _serializer; } while (!_messages.empty()) { delete _messages.front(); _messages.pop(); } } // Try to write to the client. Some or all of the data might stay // in our local buffer. bufferSize is how many bytes of data we // like to send to the client at once; that's just an optimization. bool writeOnce(unsigned bufferSize) { // Check for new messages, even if we have some data, to // maximize the chance of sending a large block of data. ThreadMonitor::find().setState("_serializer->convert()"); _serializer->convert(_messages, _outgoing, bufferSize); if (!_serializer->valid()) { LogFile::primary().quoteAndSend("ReplyToClient.C", "Error after _serializer->convert()", _socketInfo); DeleteSocketThread::deleteSocket(_socketInfo); _outgoing.clear(); return false; } ThreadMonitor::find().setState("_socketInfo->write()"); int bytesWritten = _socketInfo->write(_outgoing); ThreadMonitor::find().setState("writeOnce() review output"); if (bytesWritten < 0) { std::string errorMessage = errorString(); LogFile::primary().quoteAndSend("ReplyToClient.C", errorMessage, _socketInfo); // This is where we keep getting problems. We get here because // the socket has been closed. Then deleteSocket() has an assertion // failure. DeleteSocketThread::deleteSocket(_socketInfo); _outgoing.clear(); } else if (bytesWritten > 0) { _outgoing.erase(0, bytesWritten); } // Return true if we still have data. if (empty()) { if (_closeOnEmpty) { DeleteSocketThread::deleteSocket(_socketInfo); } return false; } else { return true; } } bool empty() const { return _outgoing.empty() && _messages.empty() && _serializer->empty(); } std::string &getOutgoing() { return _outgoing; } void addMessage(NewMessage *message) { if (_closeOnEmpty) { delete message; } else { _messages.push(message); } } void closeOnEmpty() { _closeOnEmpty = true; if (empty()) { DeleteSocketThread::deleteSocket(_socketInfo); } } }; typedef std::set< SocketInfo * > SocketSet; class ReplyToClient : private ThreadClass { private: // Each socket has it's own message queue. Some parts are standard, // and some are configurable. std::map< class SocketInfo *, class MessageQueue * > _messageQueues; // These are connections where we've got data ready to send, and we // are waiting for the socket to be ready. SocketSet _readyToSend; SelectableRequestQueue _incomingRequests; bool _dumpAll; void readFromQueue(); protected: void threadFunction(); public: void connect(CommandDispatcher *commandDispatcher) { // The client (or proxy) will send us a set_output command. // this command typically comes before the login request, // so we give it special permissions. Most commands are // only allowed after a login successful. commandDispatcher->listenForCommand("set_output", &_incomingRequests, mtSetOutputType, false, true); } ReplyToClient(ReplySerializer defaultSerializer) : ThreadClass("ReplyToClient"), _incomingRequests("ReplyToClient") { globalDefaultSerializer = defaultSerializer; startThread(); } void addToOutputQueue(NewMessage *request) { _incomingRequests.newRequest(request); } void closeWhenOutputQueueIsEmpty(SocketInfo *socket) { Request *r = new Request(socket); r->callbackId = mtCloseOnEmpty; _incomingRequests.newRequest(r); } }; // Incoming requests for this thread. void ReplyToClient::readFromQueue() { ThreadMonitor::find().setState("ReplyToClient::readFromQueue()"); _incomingRequests.resetWaitHandle(); while (Request *current = _incomingRequests.getRequest()) { { // This test seems very silly. But we are still getting mysterious // errors. I see "" with message id = 0 in the queue and we // try to send it and it fails. This is the best bet of all the // unreasonable choices. Like we are getting a message for a socket // after we have gotten the delete mesage for that socket. SocketInfo *const socketInfo = current->getSocketInfo(); assert((!socketInfo)||(socketInfo->objectIsValid())); } switch (current->callbackId) { case mtSetOutputType: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); MessageQueue *&messageQueue = _messageQueues[socket]; if (messageQueue) { // Output type has already been set. DeleteSocketThread::deleteSocket(socket); } else { std::string mode = (request->getProperty("mode")); if (mode == "zlib") { ZLibSerializer::Mode mode; if (request->getProperty("framing") == "1") mode = ZLibSerializer::MFraming; else mode = ZLibSerializer::MNormal; messageQueue = new MessageQueue(socket, new ZLibSerializer(mode)); } else if (mode == "zlib64") { messageQueue = new MessageQueue(socket, new ZLibSerializer(ZLibSerializer::M64Bit)); } else if (mode == "text") { messageQueue = new MessageQueue(socket, new TextSerializer); } else if (mode == "simple") { messageQueue = new MessageQueue(socket, new SimpleSerializer); } else if (mode == "simple64") { messageQueue = new MessageQueue(socket, new Simple64Serializer); } else { // Unknown output mode DeleteSocketThread::deleteSocket(socket); } } delete current; break; } case mtSendOutputToClient: { NewMessage *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); MessageQueue *messageQueue = _messageQueues[socket]; if (!messageQueue) { messageQueue = new MessageQueue(socket); _messageQueues[socket] = messageQueue; // The following line seems ridiculous. But we keep getting an // assertion failure later, after we try to write to a socket, // saying that the socket object doesn't exist. assert(socket->objectIsValid()); } messageQueue->addMessage(request); _readyToSend.insert(socket); if (_dumpAll) { TclList msg; msg<<"ReplyToClient.C" <<"dump_all" <getBody() <getId(); LogFile::primary().sendString(msg, socket); } break; } case mtCloseOnEmpty: { SocketInfo *socket = current->getSocketInfo(); MessageQueue *messageQueue = getPropertyDefault(_messageQueues, socket); if (messageQueue) { messageQueue->closeOnEmpty(); } else { DeleteSocketThread::deleteSocket(socket); } delete current; break; } case mtQuit: { for (std::map< class SocketInfo *, class MessageQueue * >::iterator it = _messageQueues.begin(); it != _messageQueues.end(); it++) { delete it->second; } _messageQueues.clear(); delete current; return; } case DeleteSocketThread::callbackId: { SocketInfo *socket = current->getSocketInfo(); MessageQueue *&messageQueue = _messageQueues[socket]; if (messageQueue) { delete messageQueue; } _messageQueues.erase(socket); _readyToSend.erase(socket); delete current; break; } default: delete current; } } } static void initDefaultCompressionLevel() { TclList msg; msg<objectIsValid()); pollSet.addForWrite(socket->getSocketHandle()); } pollSet.poll(); SocketSet stillReadyToSend; for (SocketSet::iterator it = _readyToSend.begin(); it != _readyToSend.end(); it++) { SocketInfo *const socket = *it; assert(socket->objectIsValid()); if (pollSet.woken().count(socket->getSocketHandle())) { // Room in the O/S buffer to write something. ThreadMonitor::find().setState("writeOnce(4096)"); bool moreToSend = _messageQueues[socket]->writeOnce(4096); ThreadMonitor::find().setState("Prepair pollSet"); if (moreToSend) { assert(socket->objectIsValid()); stillReadyToSend.insert(socket); } } else { // Buffer is still full. Keep waiting for space. assert(socket->objectIsValid()); stillReadyToSend.insert(socket); } } _readyToSend = stillReadyToSend; readFromQueue(); } } static ReplyToClient *instance; void addToOutputQueue(SocketInfo *socket, std::string const &messageBody, ExternalRequest::MessageId messageId) { // If the instance doesn't exist yet, we're in trouble. Best to bomb out. instance ->addToOutputQueue(new NewMessageCopy(socket, messageBody, messageId)); } void addToOutputQueue(SocketInfo *socket, std::string &&messageBody, ExternalRequest::MessageId messageId) { instance->addToOutputQueue(new NewMessageCopy(socket, std::move(messageBody), messageId)); } void addToOutputQueue(SocketInfo *socket, SmarterCP< std::string > const &messageBody, ExternalRequest::MessageId messageId) { instance ->addToOutputQueue(new NewMessageShare(socket, messageBody, messageId)); } void closeWhenOutputQueueIsEmpty(SocketInfo *socket) { instance->closeWhenOutputQueueIsEmpty(socket); } void initReplyToClient(ReplySerializer defaultSerializer) { if (!instance) { instance = new ReplyToClient(defaultSerializer); } } void connectToReplyToClient(CommandDispatcher *commandDispatcher) { initReplyToClient(); instance->connect(commandDispatcher); }