#include #include #include #include #include "SocketInfo.h" #include "Messages.h" #include "NewConnections.h" #include "LogFile.h" #include "ZLibMalloc.h" #include "ReplyToClient.h" #include "GlobalConfigFile.h" #include "ThreadMonitor.h" #include "ZLibStats.h" #include "CommandDispatcher.h" static const unsigned maxBufferSize = 500000; const std::string ExternalRequest::messageIdString = "message_id"; const std::string ExternalRequest::commandString = "command"; const std::string ExternalRequest::emptyString; TclList ExternalRequest::debugDump() const { TclList result = Request::debugDump(); result< maxBufferSize)) { TclList msg; msg<<"CommandDispatcher.C" <<"Invalid string length" <<_stringLength <getProperties()[_name] = value; _name.erase(); _requestState = rsGetNameLength; break; } case rsComplete: { return; } } } } ExternalRequest *ZLibBuffer::getMessage() { if (_requestState == rsComplete) { ExternalRequest *result = _request; _request = NULL; _requestState = rsNone; checkForProgress(); return result; } return NULL; } static const std::string s_addNewData = "addNewData()"; void ZLibBuffer::addNewData(std::string newData) { // This handles the decompression, then passes the result on to // checkForProgress(). ThreadMonitor::SetState setState(s_addNewData); ZLibStats::beforeDecompression(newData.size()); _zlibBuffer.next_in = (Bytef *)newData.data(); _zlibBuffer.avail_in = newData.size(); bool outputBufferFull = false; while ((_zlibBuffer.avail_in || outputBufferFull) && !_aborted) { const int bufferLength = 1024; char buffer[bufferLength]; _zlibBuffer.next_out = (Bytef *)buffer; _zlibBuffer.avail_out = bufferLength; _zlibBuffer.total_out = 0; int result = inflate(&_zlibBuffer, Z_SYNC_FLUSH); // Note: Z_BUF_ERROR is unavoidable in some cases. This means that you // provided zlib with no new input, and it had no output for you. This // happens when we are finished reading the input to zlib, the last // output from zlib perfectly filled the output buffer, and zlib has // no more data for us. if (!((result == Z_OK) || (outputBufferFull && (result == Z_BUF_ERROR)))) { TclList msg; msg<<"CommandDispatcher.C" <<"Error on inflate" <<(_zlibBuffer.msg?_zlibBuffer.msg:"NULL") <getProperties(), currentMessageString); return currentMessage; } void addNewData(std::string newData) { if (!_aborted) { _buffer += newData; if (_buffer.size() > maxBufferSize) { _aborted = true; _buffer.clear(); } } } bool getAborted() const { return _aborted; } std::string copyBuffer() const { return _buffer; } }; ///////////////////////////////////////////////////////////////////// // CommandDispatcher::TimerCallbackList // This is required so we can get a little bit of status back. // The rule is that we tweek the dead man timer each time a new // connection is made, and each time the connection sends us a valid // message. After a timeout we kill the connection. ///////////////////////////////////////////////////////////////////// void CommandDispatcher::TimerCallbackList::successfulDispatch(Request *request) { _timer->touchConnection(request->getSocketInfo()); } CommandDispatcher::TimerCallbackList::TimerCallbackList(DeadManTimer *timer) : _timer(timer) { } ///////////////////////////////////////////////////////////////////// // ListenForCommandRequest // This allows us to send a message to ourself to register a new // listener. This avoids locks and makes the common case of // dispatching a message simple and effecient. We will only register // a handful of commands, so we aren't as worried about the cost of // that. ///////////////////////////////////////////////////////////////////// class ListenForCommandRequest : public Request { private: std::string _commandName; RequestListener *_listener; int _callbackIdForNewCommand; bool _lock; bool _immuneToLock; public: ListenForCommandRequest(std::string commandName, RequestListener *listener, int callbackId, bool lock, bool immuneToLock) : Request(NULL), _commandName(commandName), _listener(listener), _callbackIdForNewCommand(callbackId), _lock(lock), _immuneToLock(immuneToLock) { } std::string getCommandName() { return _commandName; } RequestListener *getListener() { return _listener; } int getCallbackIdForNewCommand() { return _callbackIdForNewCommand; } bool getLock() { return _lock; } bool getImmuneToLock() { return _immuneToLock; } }; ///////////////////////////////////////////////////////////////////// // CallbackList ///////////////////////////////////////////////////////////////////// void CallbackList::successfulDispatch(Request *) { } void CallbackList::dumpMessage(ExternalRequest *request, std::string const &reason) { TclList forLog; forLog<getProperties().begin(); it != request->getProperties().end(); it++) { forLog<first <second; } LogFile::primary().sendString(forLog, request->getSocketInfo()); } static const std::string s_dispatchMessage = "dispatchMessage"; void CallbackList::dispatchMessage(ExternalRequest *request) { ThreadMonitor::SetState setState(s_dispatchMessage); SocketInfo *const socket = request->getSocketInfo(); if (_dumpAll) { dumpMessage(request, "dump_all"); } std::queue< ExternalRequest * > *queue = getProperty(_locked, socket); std::string command = request->getCommand(); CallbackInfo *callback = getProperty(_commandList, command); if (!callback) { if (_dumpUnknownCommands) dumpMessage(request, "UNKNOWN_COMMAND"); if (_requireLogin && !_initialized.count(socket)) DeleteSocketThread::deleteSocket(socket); delete request; } else if (queue && !callback->immuneToLock) { // This socket is locked. Queue up the request for later. queue->push(request); } else { ThreadMonitor::find().increment(command); if (callback->lock) { // Create an empty queue to lock this. There should be no // way to lock something that is already locked, but if it // is already locked, this is a no-op. _locked[socket]; // For simplicity we assume that the same commands which lock the // queue also initialize the socket. In both cases the normal use // case is that we log in before we do anything else. _initialized.insert(socket); } if (_requireLogin && !(callback->immuneToLock || _initialized.count(socket))) { DeleteSocketThread::deleteSocket(socket); delete request; } else { successfulDispatch(request); RequestListener *listener = callback->requestListener; request->callbackId = callback->callbackId; listener->newRequest(request); } } } Request *CallbackList::createNewCommandRequest(std::string commandName, RequestListener *listener, int callbackId, bool lock, bool immuneToLock) { return new ListenForCommandRequest(commandName, listener, callbackId, lock, immuneToLock); } void CallbackList::addNewCommand(Request *request) { ListenForCommandRequest *r = dynamic_cast(request); if (_commandList.count(r->getCommandName())) { TclList msg; msg <getCommandName(); LogFile::primary().sendString(msg); LogFile::primary().scheduleShutdown(); } CallbackInfo &info = _commandList[r->getCommandName()]; info.requestListener = r->getListener(); info.callbackId = r->getCallbackIdForNewCommand(); info.lock = r->getLock(); info.immuneToLock = r->getImmuneToLock(); } void CallbackList::unlock(SocketInfo *socket) { // We don't every expect an unlock without a lock. That case would // work, but we don't optimize for it. std::queue< ExternalRequest * > queue = _locked[socket]; _locked.erase(socket); if (_dumpAll && !queue.empty()) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"dump_all"<<"repeating"< *queue = getProperty(_locked, socket); if (queue) { while (!queue->empty()) { delete queue->front(); queue->pop(); } _locked.erase(socket); } _initialized.erase(socket); } CallbackList::CallbackList() { // By default logging in is optional. That's how the AX server worked. Now // we have the option of a more traditional server. The first command has // to be a login or you will get disconnected. _requireLogin = false; // getConfigItem: dump_all=1 means send a copy of every message in or out // getConfigItem: of the system to the log file. This can be useful in // getConfigItem: debugging, but it makes really big log files very // getConfigItem: quickly. This setting is shared by several libraries. _dumpAll = getConfigItem("dump_all") == "1"; // getConfigItem: dump_unknown_commands=1 sends a lot of details to the log // getConfigItem: file any time someone sends a command that we don't know // getConfigItem: how to handle. This was aimed strictly at debugging. _dumpUnknownCommands = getConfigItem("dump_unknown_commands") == "1"; } CallbackList::~CallbackList() { for (std::map< SocketInfo *, std::queue< ExternalRequest * > >::iterator it = _locked.begin(); it != _locked.end(); it++) { std::queue< ExternalRequest * > &queue = it->second; while (!queue.empty()) { delete queue.front(); queue.pop(); } } } ///////////////////////////////////////////////////////////////////// // CommandDispatcher ///////////////////////////////////////////////////////////////////// void CommandDispatcher::newRequest(Request *request) { // This function catches stuff we want to happen immediately. In particular, // the client might send a request to change input modes followed immediately // a request in the new input mode. So we must change modes immediately. switch (request->callbackId) { case mtSetInput: { ExternalRequest *r = dynamic_cast(request); SocketInfo *socket = r->getSocketInfo(); std::string mode = r->getProperty("mode", ""); if (mode == "zlib") { Buffer *&buffer = _allBuffers[socket]; Buffer *newBuffer = new ZLibBuffer(socket); if (buffer) { newBuffer->addNewData(buffer->copyBuffer()); delete buffer; } buffer = newBuffer; } else { TclList msg; msg<<"CommandDispatcher.C" <<"Unknown input mode" <(request); SocketInfo *socket = r->getSocketInfo(); std::string original = socket->remoteAddr(); std::string replacement = r->getProperty("remote"); std::string auth = r->getProperty("auth"); TclList msg; msg<<"CommandDispatcher.C" <<"mtProxyInfo" <<"original" <setRemoteAddr(replacement); LogFile::primary().sendString(msg, socket); } } } delete request; } void CommandDispatcher::threadFunction() { while (true) { _incomingRequests.waitForRequest(); while (Request *current = _incomingRequests.getRequest()) { switch (current->callbackId) { case mtNewCommand: { _callbackList.addNewCommand(current); break; } case mtUnlock: { _callbackList.unlock(current->getSocketInfo()); break; } case mtNewInput: { ThreadMonitor::SetState setState("mtNewInput"); NewInputRequest *r = dynamic_cast(current); SocketInfo *socket = r->getSocketInfo(); if (r->atEOF()) { if (_showEOF) { LogFile::primary().quoteAndSend("CommandDispatcher.C", "EOF on read", socket); } DeleteSocketThread::deleteSocket(socket); } else if (r->newSocket()) { _deadManTimer.touchConnection(socket); } else { Buffer *&buffer = _allBuffers[socket]; if (!buffer) { buffer = new TextBuffer(socket); } buffer->addNewData(r->getNewInput()); if (buffer->getAborted()) { LogFile::primary(). quoteAndSend("CommandDispatcher.C", "Buffer aborted adding new data", socket); DeleteSocketThread::deleteSocket(socket); } else { while (ExternalRequest *outgoing = buffer->getMessage()) { _callbackList.dispatchMessage(outgoing); } if (buffer->getAborted()) { LogFile::primary(). quoteAndSend("CommandDispatcher.C", "Buffer aborted while retrieving message", socket); DeleteSocketThread::deleteSocket(socket); } } } break; } case mtQuit: { delete current; return; } case DeleteSocketThread::callbackId: { SocketInfo *socket = current->getSocketInfo(); Buffer *buffer = _allBuffers[socket]; if (buffer) { delete buffer; } _allBuffers.erase(socket); _callbackList.remove(socket); break; } } delete current; } } } void CommandDispatcher::listenForCommand(std::string commandName, RequestListener *listener, int callbackId, bool lock, bool immuneToLock) { Request *request = new ListenForCommandRequest(commandName, listener, callbackId, lock, immuneToLock); request->callbackId = mtNewCommand; _incomingRequests.newRequest(request); } void CommandDispatcher::unlock(SocketInfo *socket) { Request *request = new Request(socket); request->callbackId = mtUnlock; _incomingRequests.newRequest(request); } CommandDispatcher::CommandDispatcher(std::string const &name) : ThreadClass(name), _incomingRequests(getName()), _callbackList(&_deadManTimer), _showEOF(true) { listenForCommand("set_input", this, mtSetInput, false, true); listenForCommand("proxy_info", this, mtProxyInfo, false, true); connectToReplyToClient(this); startThread(); } CommandDispatcher::~CommandDispatcher() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incomingRequests.newRequest(r); } // Don't copy this! It works, but it's unnecessarily complicated. // Just make a static variable in getInstance and initialize it in // place. G++ will make sure the initializer is called exactly // once, the first time we get to that line, automatically thread // safe. MultiCast::getInstance() is an example of the better // way to do it. I didn't use that trick on old code because I // didn't know about it! static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; CommandDispatcher *CommandDispatcher::_instance = NULL; CommandDispatcher *CommandDispatcher::getInstance() { if (!_instance) { pthread_mutex_lock(&mutex); if (!_instance) { _instance = new CommandDispatcher; } pthread_mutex_unlock(&mutex); } return _instance; }