#include #include "ThreadSafeRefCount.h" #include "LogFile.h" #include "ThreadMonitor.h" #include "Messages.h" ///////////////////////////////////////////////////////////////////// // Request ///////////////////////////////////////////////////////////////////// bool Request::notDeliverable() const { return _socketInfo && _socketInfo->isGoingDown(); } Request::~Request() { // This allows inherited classes to have destructors. //const_cast< SocketInfo * & >(_socketInfo) = (SocketInfo *)-1; } TclList Request::debugDump() const { std::string socketInfo; if (_socketInfo) socketInfo = _socketInfo->debugDump(); else socketInfo = "NULL"; return TclList(TclList("deliverable", !notDeliverable()), TclList("_socketInfo", socketInfo), TclList("callbackId", callbackId)); } ///////////////////////////////////////////////////////////////////// // allRequestQueues // // This is, effectively, a list of all threads. This is who we // notify when socket is closed. // // A note about mutexes. Currently this file defines the // allRequestQueuesMutex and one mutex per RequestQueue. Sometimes // we will acquire the mutex for a queue while we are already holding // allRequestQueuesMutex. We never try to acquire any additional // mutexes when we are holding the mutex for a queue. We do not // request any other mutexes and we do not make any arbitrary // callbacks in a mutex. // // Older versions of this code had only one mutex, and it was called // globalMutex. That code had more data protected by the mutex. It // was hard to organize the code so we didn't have a deadlock. The // only answer was to use one mutex for everything instead of // different mutexes for different queues. ///////////////////////////////////////////////////////////////////// static pthread_mutex_t allRequestQueuesMutex = PTHREAD_MUTEX_INITIALIZER; // This is protected by allRequestQueuesMutex typedef std::set< RequestQueue * > AllRequestQueues; static AllRequestQueues allRequestQueues __attribute__ ((init_priority (101))); ///////////////////////////////////////////////////////////////////// // DeleteSocketRequest // // One of these is delivered to each thread to ask that thread to // clean up. The thread deletes this object when it is done. When // the last one of these objects is deleted, ~Last() will be called // to do the final cleanup on the SocketInfo object. ///////////////////////////////////////////////////////////////////// class DeleteSocketRequest : public Request { public: class Last { private: SocketInfo *const _socket; public: Last(SocketInfo *socket) : _socket(socket) { } ~Last(); SocketInfo *getSocketInfo() const { return _socket; } typedef TSRefCount< Last > Ref; }; private: Last::Ref _last; virtual bool notDeliverable() const; public: DeleteSocketRequest(Last::Ref const &last) : Request(last->getSocketInfo()), _last(last) { callbackId = DeleteSocketThread::callbackId; } }; bool DeleteSocketRequest::notDeliverable() const { // Once we start the process of shutting down a socket, the queue will // typically refuse to deliver any messages associated with that socket. The // queue will delete the messages, instead. DeleteSocketRequest objects have // a special status. These are only created after the shutdown process has // started and they should always be delivered. return false; } DeleteSocketRequest::Last::~Last() { // The last thread has said that it's done with the socket. Now do the // global cleanup. const SocketInfo::SerialNumber serialNumber = SocketInfo::getSerialNumber(_socket); delete _socket; LogFile::primary().socketFinished(serialNumber); } ///////////////////////////////////////////////////////////////////// // DeleteSocketThread // // At one time this really was a thread. There wasn't much of a point // to that thread, so now I do the work immediately using whichever // thread requested the action. The work is basically the same as // before, but slightly simpler. // // We deliver a message to all queues, and to all threads. The // thread will do whatever cleanup it sees fit. The queue will // refuse to accept any new messages from the deleted socket. If // you attempt to send a message, and the socket is in the process // of being deleted, the queue will delete the message immediately. // // Note the timing. After the thread receives the special delete // message from its queue it will never receive any more messages // associated with the socket that's going down. // // What about messages that were already queued up? Imagine four // messages, all associated with the same socket, all sent to the same // queue. // o First a thread sends message A. Everything is normal and A will // sit in the queue. // o Someone starts the delete process. The socket is tagged as // going down. If some asked, A would say that it is // notDeliverable(). // o Someone in another thread tries to send message B to this queue. // The queue will check B.notDeliverable() in // RequestQueue::newRequest(), and the queue will immediately // delete B rather than queuing it. // o As the delete process continues, it will send a delete message, // message C, to the queue. This has a special status. Even // though the socket is going down, this message will not say that // it is notDeliverable(). This message will be delivered like // normal. // o Another thread sends message D. That thread doesn't yet know // that the socket is going down. The queue will detect and delete // message D just like it did for B. // // D has to be deleted and C has to be delivered. The program would // not be correct if this was not the case. // // B could be delivered, although it's just as well that it isn't. // Note that B comes in a very small window and we don't expect this // to happen a lot. // // What about A? Someone could check and see that the socket's going // down. It might avoid some work if we deleted the message rather // than delivering it. However, it's still correct if we deliver it. // The message will be delivered before the cleanup message and // before the socket object is deleted. // // We allow A to be delivered. We could have checked for this in // RequestQueue::getRequest(). And older version of this code would // have deleted A rather than delivering it. We decided that the // extra effort of always checking for this is more than the effort // that we'd save. Remember, if a request is big and hairy, we'd // probably move it from the RequestQueue into some internal priority // queue, anyway. We wouldn't look at the request again until after // we'd drained the RequestQueue. Also, think about the timing // involved. If we'd asked for a request from the queue just an // instant sooner we would have received A before the delete request // started. So we'd always have to deal with some requests that were // obsolete. ///////////////////////////////////////////////////////////////////// void DeleteSocketThread::deleteSocket(SocketInfo *toDelete) { assert(toDelete->objectIsValid()); if (toDelete->setGoingDown()) // This is a duplicate. We should silently ignore this request. return; // Call shutdown to notify the client immediately. The socket will be closed // automatically as soon as the last thread has acknowledged the delete // request. Sometimes that takes longer than expected. The connection is // effectively closed, as the read and write threads in this server will // not work with this socket any more. It can be confusing to the client // when the client thinks the connection is still open but the server acts // like the connection is closed. toDelete->shutDown(); // Order is important. First mark the SocketInfo object, so the RequestQueue // objects will stop delivering messages associated with the socket. Then // send a message in each queue asking the thread to delete its own data // associated with the socket. If we did it in the other order, a message // could get from the queue into the thread immediately after the thread had // done its cleanup. That would be a memory leak at best. That might lead // to someone trying to access a socket object after it was deleted. DeleteSocketRequest::Last::Ref last = new DeleteSocketRequest::Last(toDelete); pthread_mutex_lock(&allRequestQueuesMutex); { for (AllRequestQueues::iterator it = allRequestQueues.begin(); it != allRequestQueues.end(); it++) (*it)->newRequest(new DeleteSocketRequest(last)); } pthread_mutex_unlock(&allRequestQueuesMutex); } ///////////////////////////////////////////////////////////////////// // RequestQueue ///////////////////////////////////////////////////////////////////// void RequestQueue::newRequest(Request *request) { SocketInfo *const socketInfo = request->getSocketInfo(); assert((!socketInfo)||(socketInfo->objectIsValid())); std::queue< Request * >::size_type pendingRequests = 0; pthread_mutex_lock(&_mutex); // Normally we don't use a mutex to access the notDeliverable() status. // However, in this case it's important to read that value in the same // "transaction" as we are using to insert things into the queue. // // Imagine we checked notDeliverable() immediately before acquiring the // mutex, instead of inside the mutex. Consider the following case. // 1) Thread A starts processing request #1. // 2) Thread A sends a new request, request #2, to thread B. Both requests // are associated with the same socket. (And the socket is not NULL.) // 3) Thread B processes request #2. In the process it deletes request // #2 and starts the process of destroying the socket. // 4) Thread A is still processing request #1. // 5) Thread A creates request #3 and starts the process of sending the // request to thread C. Request #3 is associated with the same socket. // 6) Thread A is in this function (newRequest()) and it gets as far as // reading notDeliverable(). It might get a result of false, depending // on the exact timing. // 7) Thread B sets notDeliverable() to true. // 8) Thread B adds the delete message to every queue. // 9) Thread A acquires the mutex and adds request #3 to thread C's queue. // 10) At this point we've already broken the rules. Thread C's queue // contains a socket delete message FOLLOWED BY another request // associated with the same socket. // 11) All threads process the delete request. // 12) One of the threads deletes the socket object. // 13) Thread C reads request #3 from the queue. // 14) Thread C tries to call request->notDeliverable(), but that will // try to access the socket object, and the socket object has been // deleted. We might get an assertion failed, but accessing a deleted // object could lead to almost anything. // // Note: The assumption about calling notDeliverable() outside of the // mutex is NOT hypothetical. That's how this file used to look. It was // a bug. We saw the assertion fail in step #14, and a lot of other crazy // stuff. // // Note: You can't swap steps 7 and 8. (Not without making other // changes.) See the big block of comments above // DeleteSocketThread::deleteSocket(). // // Note: This exact set of steps is exactly what was happening when we // were handling "Read.tcl" in ../js_proxy_server/ProxyMainLoop.C. In // particular, steps 1-3 aren't strictly necessary. It's always possible // that another thread will ask to delete a socket while you're using that // socket. But this one file was particularly good at getting the timing // just right to expose this bug. if (request->notDeliverable()) { // Start by releasing the mutex. Delete might or my not try to perform // some arbitrary action. (The Request class very explicitly has a virtual // destructor.) We don't want to do that in a mutex. pthread_mutex_unlock(&_mutex); delete request; } else { _queue.push(request); pendingRequests = _queue.size(); int64_t oldestTime = 0; if ((pendingRequests > _reportSizeCutoff) && !(pendingRequests % _period)) { // Report this. We will need more items before the next report. _reportSizeCutoff = pendingRequests; oldestTime = _queue.front().submitTime; } else { // Ignore this. pendingRequests = 0; } pthread_cond_signal(&_conditionVar); pthread_mutex_unlock(&_mutex); if (pendingRequests) { TclList msg; msg< _recentWorstTime) _recentWorstTime = timePassed; result = _queue.front().request; _queue.pop(); checkForEmpty(); } pthread_mutex_unlock(&_mutex); reportIfEmpty(); if (needToInitialize) // Do this on the first read so we will report in the correct thread. ThreadMonitor::find().add(this); return result; } void RequestQueue::waitForRequest() { ThreadMonitor::SetState state("RequestQueue::waitForRequest"); pthread_mutex_lock(&_mutex); if (_queue.empty()) { pthread_cond_wait(&_conditionVar, &_mutex); } pthread_mutex_unlock(&_mutex); } bool RequestQueue::empty() { pthread_mutex_lock(&_mutex); bool result = _queue.empty(); pthread_mutex_unlock(&_mutex); return result; } RequestQueue::RequestQueue(std::string name, int period) : _name(name), _reportSizeCutoff(0), _period(period), _needToReportEmpty(false), _threadMonitorInitialized(false), _recentReadCount(0), _recentReadTime(0), _recentWorstTime(0) { pthread_mutex_init(&_mutex, NULL); pthread_cond_init(&_conditionVar, NULL); pthread_mutex_lock(&allRequestQueuesMutex); allRequestQueues.insert(this); pthread_mutex_unlock(&allRequestQueuesMutex); } RequestQueue::~RequestQueue() { pthread_mutex_lock(&allRequestQueuesMutex); allRequestQueues.erase(this); pthread_mutex_unlock(&allRequestQueuesMutex); while (!_queue.empty()) { delete _queue.front().request; _queue.pop(); } pthread_cond_destroy(&_conditionVar); pthread_mutex_destroy(&_mutex); } ///////////////////////////////////////////////////////////////////// // FairRequestQueue::UserQueue ///////////////////////////////////////////////////////////////////// bool FairRequestQueue::UserQueue::empty() const { return _byRequest.empty(); } Request *FairRequestQueue::UserQueue::pop() { if (empty()) { return NULL; } Item *item = _fifo.back(); _fifo.pop_back(); Request *result = item->request; _byRequest.erase(result); delete item; return result; } Request *FairRequestQueue::UserQueue::pop(Predicate const &p) { for (ItemList::reverse_iterator it = _fifo.rbegin(); it != _fifo.rend(); it++) { Item *item = *it; Request *result = item->request; if (p.acceptable(item->request)) { _fifo.erase((++it).base()); _byRequest.erase(result); delete item; return result; } } return NULL; } void FairRequestQueue::UserQueue::push(Request *newRequest) { Item *item = new Item; item->request = newRequest; _fifo.push_front(item); item->positionInQueue = _fifo.begin(); _byRequest[newRequest] = item; } bool FairRequestQueue::UserQueue::remove(Request *toDelete) { ItemMap::iterator found = _byRequest.find(toDelete); if (found != _byRequest.end()) { delete found->second->request; _fifo.erase(found->second->positionInQueue); delete found->second; _byRequest.erase(found); return true; } return false; } FairRequestQueue::UserQueue::~UserQueue() { for (ItemIterator it = _fifo.begin(); it != _fifo.end(); it++) { delete (*it)->request; delete *it; } } ///////////////////////////////////////////////////////////////////// // FairRequestQueue ///////////////////////////////////////////////////////////////////// bool FairRequestQueue::empty() const { return _fifo.empty(); } Request *FairRequestQueue::pop() { if (empty()) { return NULL; } _count--; UserQueue *userQueue = _fifo.back(); _fifo.pop_back(); Request *result = userQueue->pop(); if (userQueue->empty()) { _bySocket.erase(userQueue->getKey()); delete userQueue; } else { _fifo.push_front(userQueue); userQueue->positionInQueue = _fifo.begin(); } return result; } // Like the previous version of pop(), this will give preference to whichever // user is next in line. If a user has multiple jobs in the queue, the olest // one gets preference. If p() always returns true, this will act just like // pop() with no arguments. This will have O(n) execution time if p() always // returns false. Request *FairRequestQueue::pop(Predicate const &p) { for (UserList::reverse_iterator it = _fifo.rbegin(); it != _fifo.rend(); it++) { UserQueue *userQueue = *it; if (Request *result = userQueue->pop(p)) { _count--; // We want to say "_fifo.erase(it);". That's not possible because // it is a reverse_iterator, not an iterator. The following line // means the same thing. For more info see // http://www.ddj.com/cpp/184401406 _fifo.erase((++it).base()); if (userQueue->empty()) { _bySocket.erase(userQueue->getKey()); delete userQueue; } else { _fifo.push_front(userQueue); userQueue->positionInQueue = _fifo.begin(); } return result; } } return NULL; } void FairRequestQueue::push(Request *newRequest) { SocketInfo *const socketInfo = newRequest->getSocketInfo(); assert((!socketInfo)||(socketInfo->objectIsValid())); _count++; UserQueue *&userQueue = _bySocket[socketInfo]; if (!userQueue) { userQueue = new UserQueue(socketInfo); _fifo.push_front(userQueue); userQueue->positionInQueue = _fifo.begin(); } userQueue->push(newRequest); } void FairRequestQueue::remove(Request *toDelete) { remove(toDelete->getSocketInfo(), toDelete); } void FairRequestQueue::remove(SocketInfo *socket, Request *toDelete) { UserMap::iterator found = _bySocket.find(socket); if (found != _bySocket.end()) { if (found->second->remove(toDelete)) { _count--; if (found->second->empty()) { _fifo.erase(found->second->positionInQueue); delete found->second; _bySocket.erase(found); } } } } void FairRequestQueue::remove(SocketInfo *socket) { UserMap::iterator found = _bySocket.find(socket); if (found != _bySocket.end()) { _count -= found->second->getCount(); _fifo.erase(found->second->positionInQueue); delete found->second; _bySocket.erase(found); } } FairRequestQueue::~FairRequestQueue() { for (UserIterator it = _fifo.begin(); it != _fifo.end(); it++) { delete *it; } }