#include "../shared/ThreadMonitor.h" #include "../shared/ReplyToClient.h" #include "../shared/LogFile.h" #include "TopListServerConnection.h" static const std::string s_COMMAND = "command"; static const std::string s_SOCKET_ID = "socket_id"; static const std::string s_WINDOW_ID = "window_id"; static const std::string s_COLLABORATE = "collaborate"; static const std::string s_SAVE_TO_MRU = "save_to_mru"; static const std::string s_USER_ID = "user_id"; static const std::string s_SKIP_METADATA = "skip_metadata"; static const std::string s_1 = "1"; static const std::string s_ALL = "all"; std::string TopListServerConnection::getInfoForThreadMonitor() { return TclList("total user count", _knownUsers.size(), "active socket count", _bySocket.size()); } void TopListServerConnection::initializeInThread() { // These threads typically show up as "top_list_delayed" or "top_list_live" // in the log file. This will add some Extra data to the ThreadMonitor // log entries. ThreadMonitor::find().add(this); } void TopListServerConnection::requestData(TalkWithServer *talkWithServer, SocketInfo *socket, RequestByWindowId::const_iterator request) { const UserId userId = userInfoGetInfo(socket).userId; TalkWithServer::Message message; message[s_COMMAND] = _startCommand; message[s_SOCKET_ID] = ntoa(SocketInfo::getSerialNumber(socket)); message[s_WINDOW_ID] = request->first; message[s_COLLABORATE] = request->second.collaborate; message[s_SAVE_TO_MRU] = request->second.saveToMru?"1":"0"; message[s_USER_ID] = ntoa(userId); if (request->second.metadataSent) message[s_SKIP_METADATA] = s_1; talkWithServer->sendMessage(message); if (userId) { const bool firstTimeAdded = _knownUsers.insert(userId).second; if (firstTimeAdded) LogFile::primary().sendString(TclList()<requests.erase(windowId)) { if (TalkWithServer *talkWithServer = getConnection(false)) { TalkWithServer::Message message; message[s_COMMAND] = _stopCommand; message[s_SOCKET_ID] = ntoa(SocketInfo::getSerialNumber(socket)); message[s_WINDOW_ID] = windowId; talkWithServer->sendMessage(message); } } } } void TopListServerConnection::stopData(SocketInfo *socket) { if (_bySocket.erase(SocketInfo::getSerialNumber(socket))) if (TalkWithServer *talkWithServer = getConnection(false)) { TalkWithServer::Message message; message[s_COMMAND] = _stopCommand; message[s_SOCKET_ID] = ntoa(SocketInfo::getSerialNumber(socket)); message[s_ALL] = s_1; talkWithServer->sendMessage(message); } } static const std::string s_repeatAllDataRequests = "repeatAllDataRequests"; void TopListServerConnection::repeatAllDataRequests(TalkWithServer *talkWithServer) { ThreadMonitor::find().increment(s_repeatAllDataRequests); ThreadMonitor::SetState state(s_repeatAllDataRequests); for (BySocket::const_iterator socketIt = _bySocket.begin(); socketIt != _bySocket.end(); socketIt++) for (RequestByWindowId::const_iterator windowIt = socketIt->second.requests.begin(); windowIt != socketIt->second.requests.end(); windowIt++) requestData(talkWithServer, socketIt->second.socket, windowIt); } void TopListServerConnection::socketClosed(SocketInfo *socket) { stopData(socket); } void TopListServerConnection::onMessage(std::string bytes, int clientId, TalkWithServer::CancelId cancelId) { switch (clientId) { case CI_FORWARD: { const std::string key = getLine(bytes); SocketInfo::SerialNumber serialNumber = strtolDefault(key, 0); if (!serialNumber) // Something's wrong with the sender or the message got garbled. ThreadMonitor::find().increment("INVALID_SOCKET"); else { ClientInfo *clientInfo = getProperty(_bySocket, serialNumber); if (!clientInfo) // Presumably that socket was closed. We try to cancel all work // for a closed socket. But some messages might cross each other // in transit. So a few of these are reasonable, but we don't want // to see a lot of them. ThreadMonitor::find().increment("socket_not_found"); else if (!(clientInfo->socket && clientInfo->messageId.present())) // Maybe the client requested data before setting up a listener // channel. That would be a client bug. Not likely, but // that should not cause the server to do anything bad. ThreadMonitor::find().increment("listener_not_initialized"); else { // TODO we aren't updating the metadataSent flag. It seems like // the server would need to send us more information to do that! ThreadMonitor::find().increment("forwarded"); addToOutputQueue(clientInfo->socket, bytes, clientInfo->messageId); } } break; } default: ForeverServerConnection::onMessage(bytes, clientId, cancelId); } } void TopListServerConnection::onNewConnection(TalkWithServer *talkWithServer) { TalkWithServer::Message message; message[s_COMMAND] = getName() + "_listen"; talkWithServer->sendMessage(message, this, CI_FORWARD, true); repeatAllDataRequests(talkWithServer); } bool TopListServerConnection::shouldTryToConnect() { //TclList msg; //msg<_bySocket[SocketInfo::getSerialNumber(getSocketInfo())]; clientInfo.socket = getSocketInfo(); clientInfo.messageId = messageId; //TclList msg; //msg<This = this; work->messageId = messageId; getContainer()->addToQueue(work); } void TopListServerConnection::newTopList(SocketInfo *socket, std::string const &collaborate, std::string const &windowId, bool saveToMru) { class W : public IContainerThread::Work { public: TopListServerConnection *This; std::string collaborate; std::string windowId; bool saveToMru; virtual void inThread() { //TclList msg; //msg<requestData(getSocketInfo(), collaborate, windowId, saveToMru); } W(SocketInfo *socket) : Work(socket) { } }; W *work = new W(socket); work->This = this; work->collaborate = collaborate; work->windowId = windowId; work->saveToMru = saveToMru; getContainer()->addToQueue(work); } void TopListServerConnection::cancelTopList(SocketInfo *socket, std::string const &windowId) { class W : public IContainerThread::Work { public: TopListServerConnection *This; std::string windowId; virtual void inThread() { //TclList msg; //msg<stopData(getSocketInfo(), windowId); } W(SocketInfo *socket) : Work(socket) { } }; W *work = new W(socket); work->This = this; work->windowId = windowId; getContainer()->addToQueue(work); }