#ifndef __Messages_h_ #define __Messages_h_ #include #include #include #include #include #include "MiscSupport.h" #include "SocketInfo.h" #include "ThreadClass.h" #include "FixedMalloc.h" #include "ThreadMonitor.h" /* A Request is a fundamental part of our architecture. We have a lot of * threads with very little exported. Every thread has an event queue, much * like the GUI thread in a lot of languages. The event queue is how different * threads interact. Request is the base class for all objects sent via the * request queue. * * Requests are typically simple objects with only public, non-const data * fields. Sometimes we have some helper functions to go with that data. The * threads are responsible for all the processing. In many cases people use * Request directly, with no other data. The callbackId field is sufficient. * * Requests are often reused. A request can pass from one thread to another, * to another. This allows us to build up state inside a request, rather than * constantly copying data from one request to the next. This is especially * helpful with large STL data structures. You don't have the expense of a * copy or the programming headaches of dynamic memory. (See * ThreadSafeRefCount.h for an alternate and sometimes complementary way of * safely and efficiently sharing data.) * * Requests are always allocated on the heap. That's required because requests * are polymorphic. At any one time one piece of code is holding onto the * request and responsible for deleting the request if required. The sender * never keeps a list of outstanding requests. That could cause problems * because another thread or object could delete the request at any time. * Every class, (e.g. RequestQueue) which is designed to hold requests must * be responsible for memory management. The good thing is that we almost * never have to think about memory management in other places because we just * create STL objects in the request, and those get memory management for free. * * Requests are sometimes used as the base for more complicated communications * mechanisms. ../generate_alerts/misc_framework/DataNodes.h is a large * framework which includes a BroadcastMessage, based on a Request. That * solves several problems, including sending a message to an object which * might or might be deleted before the message is delivered. * ContainerThread.h includes various wrappers around Request, and * ContainerThread.h the base for even more tools. * * Requests are not just used in event queues. Many threads have the same * shape. They have one event queue for messages from other threads. And * they have some type of priority queue for internal use. Some requests, * like a complicated history request will get moved from the event queue to * the priority queue. After we've completely drained the event queue then we * start processing those big items in order of priority. Other requests are * fast and done immediately after receiving them from the event queue. In * particular, if the user sends a request to cancel a previous request, we * do that immediately. If we only had the event queue, the user could never * cancel a request because we'd never see the cancel until the first request * was done. See FairRequestQueue a common example of a priority queue for * requests. * * See the ExternalRequest for the original inspiration for this design. I * created the Request class by pulling out the reusable parts of * ExternalRequest. The name "Request" comes from this history. "Message" * would have been a better name. Some of these are requests from the user, * some are replies to the user, and some are completely internal messages * within the server. */ class Request : public FixedMalloc { private: SocketInfo * const _socketInfo; protected: friend class RequestQueue; virtual bool notDeliverable() const; public: // The socket is a fundamental part of every request. // // If a client asks for data, that request might pass through several // threads. One Request object might be reused, or we might make several // different objects. Eventually we need to send a reply, and we need to // make sure we send the response back to the right client. We use the // client's socket to track the request through the entire process. // // Memory management is tricky with so many threads. When a socket closes, // all data associated with the socket needs to be cleaned up. A lot of // that data is stored in Request objects which are stored in various // data structures. Most of these data structures have an efficient way // to delete all requests for a specific socket. // // FairRequestQueue and other data structures use the socket to set the // priority of a request. FairRequestQueue uses the basic assumption: // one user <--> one socket. SocketInfo *getSocketInfo() const { return _socketInfo; } // Standard virtual destructor. This class is inherited a lot, and a lot // of different code might delete objects of this type. virtual ~Request(); // The socket can be NULL. We don't provide a default; we don't want // someone to forget the socket because it's so important. Request(SocketInfo *socketInfo) : _socketInfo(socketInfo) { } // The receiver can use this payload any way that it wants to. Typically // a thread will use an enum to assign these numbers. Typically each time a // thread reads a request from the event queue, it will use a switch // statement based on the the callbackId to decide how to process the // request. // // This is explictly not const. When you reuse a message you typically // change the callbackId. Use the callbackId to say the current state / // what you want to happen to this request next. // // Note that negative numbers are reserved for the framework. See // DeleteSocketThread::callbackId for an example. int callbackId; virtual TclList debugDump() const; }; /* This is a generic interface for listening to events. Typically if an * object wants to receive events from other parts of the code, it will use * an EventQueue. However, sometimes the object wants to use the event * immediately, in the current thread. * * The CommandDispatcher is a good example of a class which sends events * to a RequestListener. Again, most people listening to the CommandDispatcher * send all commands to their event queue. But for various reasons some code * does not want to do that. PingManager for example is so simple that it * doesn't need its own thread. The CommandDispatcher doesn't know or care * who is using an event queue. * * Another interesting example is ../fast_alert_search/RecordDispatcher.h. * This is an interface for sending large quantities of data. There are * multiple types of producers and consumers of the data. They all use the * RequestListener interface to send and receive these messages. Some * consumers use a standard RequestQueue to receive the data. Many consumers, * however, are derived from ForeverThreadUser, and ForeverThreadUser * implements RequestListener. */ class RequestListener { public: virtual void newRequest(Request *request) = 0; virtual ~RequestListener() { } }; /* RequestQueue implements a standard event queue. Most threads will create * one of these, typically named _incoming. * * For the most part this is a simple thread-safe FIFO. However, this also * listens for the system messages that tell us to delete old requests. * Creating a RequestQueue will automatically subscribe the listener to delete * messages. But the RequestQueue will also handle some deletions itself. * This combination is required to make sure we consistently and completely * delete all old data when a socket disconnects. * * A RequestQueue will automatically report to the log when it gets backed up. * See the constructor for ways to configure this. This is a way to determine * the health of a thread. * * Any number of producers can add to the queue. Only one thread should * consume from each queue. While the reading interface is thread safe, * sharing a queue would break the rules for cleaning up after a socket closes. * * SelectableRequestQueue is an alternative. That works well if a thread wants * to listen directly to a socket in addition to Request objects. * SelectableRequestQueue also works well with PipeConditionVar, our version * of a Unix condition variable or a Windows event. SelectableRequestQueue is * a subclass of RequestQueue, so this entire comment block applies to both * classes. */ class RequestQueue : public RequestListener, NoAssign, NoCopy, private ThreadMonitor::Extra { private: pthread_mutex_t _mutex; pthread_cond_t _conditionVar; struct Element { int64_t submitTime; Request *request; Element(Request *request) : submitTime(getMicroTime()), request(request) {} }; std::queue< Element > _queue; const std::string _name; std::queue< Request * >::size_type _reportSizeCutoff; const int _period; bool _needToReportEmpty; bool _threadMonitorInitialized; int _recentReadCount; int64_t _recentReadTime; int64_t _recentWorstTime; void checkForEmpty(); void reportIfEmpty(); protected: // From ThreadMonitor::Extra virtual std::string getInfoForThreadMonitor(); public: std::string const &getName() const { return _name; } // Call this to add to the queue. // // We may be holding the allRequestQueuesMutex when this is called. // If you overload this function be careful about acquiring any // additional mutexes. virtual void newRequest(Request *request); // Grab the next request. Returns NULL if the queue is empty. Request *getRequest(); // Efficiently waits for a request. This will automatically tell // ThreadMonitor what we are doing. void waitForRequest(); bool empty(); // The name is used mostly for the log. (Although we do export it for anyone // who asks.) The name is typically the same as the name of the thread. // // Period says how often to tell the log that we are backed up. The default // is 100 to say report to the log when there are 100 items in the queue, and // again at 200, 300, etc. The default was designed for ax_alert_server // where we were trying to be very responsive to user requests. If a user // request was slow we'd often move it to a different data structure. // // Other applications have different characteristics. Some applications are // listening to market data or our internal alert events. These are often // much simpler events than a user request, but there are a lot more of them. // And these often come in large bursts. Set the period to something that // doesn't happen a lot for your particular thread. RequestQueue(std::string name, int period = 100); // The destructor works. In practice, however, not many people delete // a queue. That's tough in a multi-threaded program. It's hard to organize // things so all the other threads know that you are going down. // // Note: The destructor is responsible for unsubscribing to the socket // deleted messages. Note that these are not just carrying data. We don't // clean up the socket until every one of the delete messages has been // deleted. So if you ignore this queue, you're not just wasting a little // memory on the messages in the queue. You're also wasting sockets. You // will run out of sockets long before you run out of memory. // // In short, if you stop listening to a queue you should delete the queue // or abort the program. virtual ~RequestQueue(); }; class DeleteSocketThread { public: // When we are done with a socket, we must notify all threads. When you want // to close a socket, call deleteSocket(). This will send a // broadcast to all RequestQueues with a message with id of // DeleteSocketThread::callbackId. Each thread should remove any references // to this socket, then delete the message. After the last message was // deleted, the socket will be closed and the SocketInfo object will be // deleted. While the shutdown is in progress, you cannot send a message // with that socket. Any such messages will be deleted immediately by the // queuing mechanism. static const int callbackId = -1; static void deleteSocket(SocketInfo *toDelete); }; // This data structure holds a group of requests. If there is only one // connection, then this becomes a simple FIFO. If each socket only sends us // one request at a time, this is a simple FIFO. If a socket submits several // requests at once, the first one is immediately put at the end of the line. // As soon as that one is removed from the head of the queue, the next request // from that connection takes its place at the end of the line. // // This prevents one user from overwhelming our resources. When a user // makes his first request, and 3 other users have already made requests, // the new user will only have to wait for 3 requests, regardless of how // many requests those other three users have made. // // In addition to the FIFO nature of the group, we can also find and delete // specific elements efficiently. In that sense we own the objects in the // group. If, for example, you delete the queue, all objects still in the // group are deleted. If you explicitly call one of the remove functions, // again, the underlying object will be deleted. class FairRequestQueue { public: class Predicate { public: virtual bool acceptable(Request const *) const =0; virtual ~Predicate() {} // Avoid compiler warning. }; private: int _count; struct Item; typedef std::list< Item * > ItemList; typedef ItemList::iterator ItemIterator; typedef std::map< Request *, Item * > ItemMap; struct Item : public FixedMalloc { Request *request; ItemIterator positionInQueue; }; class UserQueue; typedef std::list< UserQueue * > UserList; typedef UserList::iterator UserIterator; typedef std::map< SocketInfo *, UserQueue * > UserMap; class UserQueue : public FixedMalloc { private: ItemList _fifo; ItemMap _byRequest; SocketInfo *_socket; public: UserIterator positionInQueue; bool empty() const; int getCount() const { return _byRequest.size(); } Request *pop(); Request *pop(Predicate const &p); void push(Request *newRequest); bool remove(Request *toDelete); // Returns true if something was removed. SocketInfo *getKey() { return _socket; } UserQueue(SocketInfo *socket) : _socket(socket) {} ~UserQueue(); }; UserList _fifo; UserMap _bySocket; public: bool empty() const; int getCount() const { return _count; } int getCount(SocketInfo *socket) const { if (UserQueue *userQueue = getPropertyDefault(_bySocket, socket)) return userQueue->getCount(); else return 0; } Request *pop(); // Returns null if we are empty. Request *pop(Predicate const &p); // Returns null if nothing is found. void push(Request *newRequest); // Efficiently removes any and all requests associated with the socket. void remove(SocketInfo *socket); // Remove the specific item. If the item is not currently in the queue, then // this does nothing. The second form of the call will work even if the // request has already been deleted. (But be careful. It's possible that // a new request could be added with the same pointer, so we can't tell them // apart.) void remove(Request *toDelete); void remove(SocketInfo *socket, Request *toDelete); FairRequestQueue() : _count(0) { } ~FairRequestQueue(); }; #endif