#ifndef __TimedJobList_h_ #define __TimedJobList_h_ #include "../shared/MiscSupport.h" #include "../shared/SocketInfo.h" /* This was extracted from TopListWorkers.C to make it easier to follow. * This contains the list of jobs to do, and nothing else. Each job contains * a time when we want to execute it. The list is optimized two ways. You * can quickly find the next job in chronological order. This is how we decide * what to execute and how long to sleep. You can also find jobs by the local * socket, the remote socket, or the window id. This is necessary to remove * jobs for various reasons. * * This idea is based on the original code in ../ax_alert_server/TopList.C. * However I redid the implementation. There is one major new requirement in * this file that was not around in the original. This file has to keep track * of the socket used to connect the client to ax_alert_server, and second * socket used to connect ax_alert_server to this program. It's just more of * the same. If either type of socket is disconnected, we need to remove all * of the requests related to that socket. * * I made this a template in case we had another unit with similar needs. But * I don't think that will be the case. Alerts, in particular, have a * completely different structure. All of the strategies / windows for a * single user are handled at once for alerts. This class assigns a different * time for each strategy / window. Most data we provide is not periodic, so * it wouldn't have a structure anything like this. */ template < typename Job > class TimedJobList { private: typedef std::pair< TimeVal::Microseconds, Job > RunTimePair; typedef std::map< std::string, RunTimePair > ByWindowId; typedef std::map< std::string, ByWindowId > ByRemoteSocket; typedef std::map< SocketInfo *, ByRemoteSocket > BySocket; BySocket _bySocket; typedef std::set< RunTimePair > ByRunTime; ByRunTime _byRunTime; public: // The same window cannot exist in the list twice. This will automatically // delete the old one if it exists. void replace(Job job, int delayInSeconds) { erase(job->socket, job->remoteSocketId, job->windowId); TimeVal runTime(true); runTime.addSeconds(delayInSeconds); RunTimePair runTimePair(runTime.asMicroseconds(), job); _byRunTime.insert(runTimePair); _bySocket[job->socket][job->remoteSocketId][job->windowId] = runTimePair; } void erase(Job job) { // Notice the assumptions about the Job type. This is pretty much required // by our structure as a whole. Otherwise we'd have to store more // information in our data structures. erase(job->socket, job->remoteSocketId, job->windowId); } // It is safe to call this even if the job is not in the list. In that case, // nothing will happen. void erase(SocketInfo *socket, std::string const &remoteSocket, std::string const &windowId) { // Invariant: There are no empty maps. If you erase the last item in a // map, then erase the entire map. Be careful not to create any empty // maps. if (ByRemoteSocket *byRemoteSocket = getProperty(_bySocket, socket)) { if (ByWindowId *byWindowId = getProperty(*byRemoteSocket, remoteSocket)) { if (RunTimePair *pair = getProperty(*byWindowId, windowId)) { _byRunTime.erase(*pair); byWindowId->erase(windowId); if (byWindowId->empty()) { // That was the last window id associated with the remote socket. byRemoteSocket->erase(remoteSocket); if (byRemoteSocket->empty()) { // That was the last remote socket associated with the socket. _bySocket.erase(socket); } } } } } } // Possibly remove several jobs. The windowId becomes a wild card. void erase(SocketInfo *socket, std::string const &remoteSocket) { if (ByRemoteSocket *byRemoteSocket = getProperty(_bySocket, socket)) { ByWindowId byWindowId = getPropertyDefault(*byRemoteSocket, remoteSocket); for (typename ByWindowId::const_iterator it = byWindowId.begin(); it != byWindowId.end(); it++) erase(socket, remoteSocket, it->first); } } // Remove any and all jobs associated with this socket. void erase(SocketInfo *socket) { ByRemoteSocket byRemoteSocket = getPropertyDefault(_bySocket, socket); for (typename ByRemoteSocket::const_iterator it = byRemoteSocket.begin(); it != byRemoteSocket.end(); it++) erase(socket, it->first); } bool empty() const { // Invariant: There are no empty maps. If you erase the last item in a // map, then erase the entire map. return _bySocket.empty(); } bool aJobIsReady() const { if (empty()) return false; TimeVal::Microseconds now = TimeVal(true).asMicroseconds(); return _byRunTime.begin()->first <= now; } // This a copy of the next job that we should execute. This returns a // default object of type Job if no job is ready. If an object is found, // it is removed from the queue. I'm assuming a Job is a smart pointer, // so copying it is not expensive. Job nextJob() { if (!aJobIsReady()) return Job(); Job result = _byRunTime.begin()->second; erase(result); return result; } // This returns the number of ms until the next item is ready. If the result // is 0 or less, nextJob() will return something immediately. If this list // is empty, this will return MAXINT. Otherwise, this will return the number // of milliseconds until aJobIsReady() is true. // // This function rounds up. We store numbers that are precise to the // microsecond, because that's what linux gives us. But poll() is only // precise to the millisecond. If we rounded in the other direction, we'd // wake up several times in a row when the desired sleep time is just under // 1ms. The man page for poll() shows that it is precise to the nanosecond, // but in fast it rounds to the nearest millisecond. If you ask to sleep // for 999,999 nanoseconds, that's the same as asking to sleep for 0 // nanoseconds. int sleepTimeMs() const { if (empty()) return std::numeric_limits< int >::max(); TimeVal::Microseconds now = TimeVal(true).asMicroseconds(); return (_byRunTime.begin()->first - now + 999) / 1000; } std::string debugDump() const { int count = 0; const TimeVal::Microseconds now = TimeVal(true).asMicroseconds(); std::string result; result += "By Socket:\n"; for (typename BySocket::const_iterator socketIt = _bySocket.begin(); socketIt != _bySocket.end(); socketIt++) { result += " "; result += ntoa(SocketInfo::getSerialNumber(socketIt->first)); result += ":\n"; for (typename ByRemoteSocket::const_iterator remoteSocketIt = socketIt->second.begin(); remoteSocketIt != socketIt->second.end(); remoteSocketIt++) { result += " "; result += remoteSocketIt->first; result += ":\n"; for (typename ByWindowId::const_iterator windowIt = remoteSocketIt->second.begin(); windowIt != remoteSocketIt->second.end(); windowIt++) { result += " "; result += windowIt->first; result += ": "; result += ntoa((windowIt->second.first - now) / 1000000.0); result += "\n"; count++; } } } result += "By Time: "; bool first = true; for (typename ByRunTime::const_iterator it = _byRunTime.begin(); it != _byRunTime.end(); it++) { if (first) first = false; else result += ", "; result += ntoa((it->first - now) / 1000000.0); } result += "\n"; result += ntoa(count); if (count == (int)_byRunTime.size()) // Good! result += " == "; else // Bad! result += " != "; result += ntoa(_byRunTime.size()); result += "\n"; return result; } }; #endif