//#include "../shared/SimpleLogFile.h" #include "TarPit.h" ///////////////////////////////////////////////////////////////////// // TarPit::PenaltyAssessor ///////////////////////////////////////////////////////////////////// void TarPit::PenaltyAssessor::resetIfNeeded() { TimeVal now(true); if (now < _nextResetTime) return; // Reset. _nextResetTime = now; _nextResetTime.addMinutes(3); _nextAllowedTime = now; _count = 0; } TimeVal TarPit::PenaltyAssessor::getNextAllowedTime() { resetIfNeeded(); return std::max(TimeVal(true), _nextAllowedTime); } void TarPit::PenaltyAssessor::increment() { resetIfNeeded(); _count++; // Worst case average of 1 every 3 seconds. _nextAllowedTime.addMicroseconds(_count * _count * 2439L); //TclList msg; //msg<request; _byRequest.erase(result); delete item; return result; } Request *TarPit::UserQueue::peek() const { if (empty()) return NULL; return _fifo.back()->request; } void TarPit::UserQueue::push(Request *newRequest) { Item *item = new Item; item->request = newRequest; _fifo.push_front(item); item->positionInQueue = _fifo.begin(); _byRequest[newRequest] = item; } bool TarPit::UserQueue::remove(Request *toDelete) { ItemMap::iterator found = _byRequest.find(toDelete); if (found != _byRequest.end()) { delete found->second->request; _fifo.erase(found->second->positionInQueue); delete found->second; _byRequest.erase(found); return true; } return false; } TarPit::UserQueue::~UserQueue() { for (ItemIterator it = _fifo.begin(); it != _fifo.end(); it++) { delete (*it)->request; delete *it; } } ///////////////////////////////////////////////////////////////////// // TarPit ///////////////////////////////////////////////////////////////////// bool TarPit::needsPenalty(Request *request) const { if (_needsPenalty) return _needsPenalty->needsPenalty(request); else // A reasonable default! If you wanted the default to be false, you'd // use a FairRequestQueue, instead. return true; } TimeVal TarPit::increment(Request *request) { PenaltyAssessor &penaltyAssessor = _penaltyAssessors[request->getSocketInfo()]; if (needsPenalty(request)) penaltyAssessor.increment(); return penaltyAssessor.getNextAllowedTime(); } bool TarPit::ready() { if (_timedList.empty()) // No items. return false; // The items are sorted by time. If the first item is not ready, // none of the items are ready. //TclList msg; //msg<first.waitTime()).asMicroseconds() // <<_timedList.begin()->first.ctimeString() // <<(_timedList.begin()->first <= TimeVal(true)?"true":"false"); //sendToLogFile(msg); return _timedList.begin()->first <= TimeVal(true); } timeval *TarPit::untilNextRequest() { if (_timedList.empty()) // Wait forever. There is no next request. return NULL; // Wait for the first request to be ready. _untilNextRequest = _timedList.begin()->first.waitTime(); //TclList msg; //msg<first.ctimeString(); //sendToLogFile(msg); return &_untilNextRequest; } Request *TarPit::pop() { if (!ready()) return NULL; _count--; UserQueue &userQueue = _bySocket[_timedList.begin()->second]; _timedList.erase(_timedList.begin()); Request *result = userQueue.pop(); assert(result); // We checked for !ready before this. if (userQueue.empty()) // No more requests for this user. Remove the resources. _bySocket.erase(userQueue.getSocket()); else // More requests. Return the user to the master queue. addToTimedList(userQueue); return result; } void TarPit::addToTimedList(UserQueue &userQueue) { Request *request = userQueue.peek(); assert(request); const TimeVal nextTime = increment(request); userQueue.setNextTime(nextTime); //TclList msg; //msg<getSocketInfo()]; userQueue.push(newRequest); if (!userQueue.getSocket()) { //msg<<"new userQueue"; userQueue.setSocket(newRequest->getSocketInfo()); addToTimedList(userQueue); } //sendToLogFile(msg); } void TarPit::remove(SocketInfo *socket) { if (UserQueue *userQueue = getProperty(_bySocket, socket)) { _count -= userQueue->getCount(); _timedList.erase(userQueue->getTimedListKey()); _bySocket.erase(socket); } } void TarPit::remove(Request *toDelete) { remove(toDelete->getSocketInfo(), toDelete); } void TarPit::remove(SocketInfo *socket, Request *toDelete) { if (UserQueue *userQueue = getProperty(_bySocket, socket)) if (userQueue->remove(toDelete)) { _timedList.erase(userQueue->getTimedListKey()); _bySocket.erase(socket); } }