#include "IPollSet.h" #include "ThreadMonitor.h" #include "InputFramework.h" static const std::string S_look_for_data="look_for_data"; static const std::string S_handle_commands="handle_commands"; static const std::string S_sleep="sleep"; InputListener::InputListener(RequestListener *requestListener, int requestId) : ThreadClass("InputListener"), _incomingRequests("InputListener"), _requestListener(requestListener), _outgoingRequestId(requestId) { startThread(); } InputListener::~InputListener() { Request *request = new Request(NULL); request->callbackId = mtQuit; _incomingRequests.newRequest(request); waitForThread(); } void InputListener::listenToNewSocket(SocketInfo *socketInfo) { Request *request = new Request(socketInfo); request->callbackId = mtNewSocket; _incomingRequests.newRequest(request); } void InputListener::threadFunction() { std::map< int, SocketInfo* > activeSockets; // socket handle -> object ThreadMonitor &m = ThreadMonitor::find(); IPollSet pollSet; pollSet.addForRead(_incomingRequests.getWaitHandle()); while (true) { //m.setState("pollSet.poll()"); pollSet.poll(); // Look at any sockets with incoming data. // We must look at the socket data immediately after the select, before // handling the commands. Otherwise we might delete a socket that set // off the select(). This becomes a serious problem when we delete // a socket, then create a new one with the same file handle. m.setState(S_look_for_data); for (IPollSet::HandleSet::iterator it = pollSet.woken().begin(); it != pollSet.woken().end(); it++) { // Look through the list of readable sockets. if (SocketInfo *socket = getPropertyDefault(activeSockets, *it)) { // Look ignore the _incomingRequests queue and only look at // sockets with real data. _requestListener-> newRequest(new NewInputRequest(socket, socket->readOnce(), _outgoingRequestId)); if (socket->atEOF()) { activeSockets.erase(*it); pollSet.removeForRead(*it); } } } // Look for commands. m.setState(S_handle_commands); _incomingRequests.resetWaitHandle(); while (Request *current = _incomingRequests.getRequest()) { switch (current->callbackId) { case mtQuit: { delete current; return; } case mtNewSocket: { SocketInfo *socket = current->getSocketInfo(); activeSockets[socket->getSocketHandle()] = socket; pollSet.addForRead(socket->getSocketHandle()); _requestListener-> newRequest(new NewInputRequest(socket, _outgoingRequestId)); break; } case DeleteSocketThread::callbackId: { SocketInfo *socket = current->getSocketInfo(); activeSockets.erase(socket->getSocketHandle()); pollSet.removeForRead(socket->getSocketHandle()); } } delete current; } m.increment(S_sleep); } }