#ifndef __ContainerThread_h_ #define __ContainerThread_h_ #include #include #include "Messages.h" #include "ThreadSafeRefCount.h" /* This is a wrapper around items that should run in a thread, but don't * want to make their own thread. * * When I say you're not making a thread, I mean two things. * a) You're avoiding writing a lot of boilerplate code. * b) Multiple unrelated units can share the same thread object, saving * resources at run-time. * * I originally created this for TalkWithServer. I was trying to mimic the way * that C# used threads and sockets. In practice that didn't work well. * Normally you want TalkWithServer to be running in the same thread as the * code that's using it. * * There is another reason we don't put all the TalkWithServer objects into * one container thread. The work involved with notifications is linear to * the number of IContainerThreadUser objects in the ContainerThread. That's * fine for our typical use with just a handful of users per thread. Look at * ../micro_proxy/ForwardingThread.C as an example if you need to listen to a * large number of sockets efficiently. * * That said, this file is still a HUGE success. Especially ForeverThreadUser. * Sharing threads is popular. The batch thread is a nice way to handle * database requests that you don't once a day or once an hour but the exact * time doesn't matter. There are other small jobs that get grouped into other * threads. * * See ../fast_alert_search/AlertMicroService.C for another good example. * That code is broken into two distinct pieces to make it easier to follow. * The AlertProvider class could possibly be moved into it's own file as a * reusable library. AlertProvider and the main part of the file are each * subclasses of ForeverThreadUser. AlertProvider has to be in the same * thread as its user. So far this sounds like a lot of older code. The * huge benefit that we got here was that AlertProvider could directly talk * with the thread. It could subscribe to events, or request a wake up at a * specific time. In the past that was much messier. The main part of the * thread would have to act as an intermediary between the event queue and * the library. That lead to some really ugly (and often buggy) code. * * NewWorkerCluster is built on top of this file. If you have one unit that * needs to use multiple threads, consider starting from NewWorkerCluster. * * If you are using this file and you need access to a database, consider using * DatabaseForThread.h. That file will help you reuse the database * connections. Older code would create the database objects directly in * the thread object. DatabaseForThread.h is the only way to share a database * when using a ContainerThread, and probably would have been a good idea in * the older code, too. */ /* Before the thread goes to sleep, waiting for the next event, it asks each * of its users for their input. They don't have to do anything. But they * can make requests, for example, wake me after 30 seconds. This interface * allows ContainerThread to expose these options to each user, but only at * the appropriate times. */ class IBeforeSleepCallbacks { public: // Wake up when this handle becomes readable. virtual void waitForRead(int handle) =0; // Wake up when this handle becomes writable. virtual void waitForWrite(int handle) =0; // Wake up after this much time has passed. We might wake up sooner. We // will wake up for external events, regardless of time. If we get multiple // calls to this, we wake up after the earliest time. If you don't need to // wake up at any particular time, don't call this at all. virtual void wakeAfterMs(int timeout) =0; // Many of readings are done in microseconds, but for the most part when // you go to sleep it's only precise to the millisecond. We take care of // the rounding here. void wakeAfterMicroSeconds(int64_t us); // A wrapper around wakeAfterMs(). This compares the given time to the // current time, changes the units, deals with the different sizes of // integers, etc. If time is in the past, including 0, this will request a // wakeup ASAP. void wakeAtTime(time_t wakeAt); virtual ~IBeforeSleepCallbacks() { } }; class IContainerThread; /* If you want to exist in a thread, and you don't want to make your own * thread, implement a subclass of IContainerThreadUser, then add yourself to * the container thread. */ class IContainerThreadUser { private: // If you don't know your container, it's hard to do much. The container // should never change; we are preserving the idea that a lot of tasks // happen in only one thread. IContainerThread *const _container; public: IContainerThreadUser(IContainerThread *container) : _container(container) { } virtual ~IContainerThreadUser() { } // You get this call in the thread whenever a socket closes. virtual void socketClosed(SocketInfo *socket) { } // Do we really need awake() and beforeSleep()? It seems like these could // be combined. One gets called at the top of the loop, and one at the // bottom. Except for the very first time, beforeSleep() will be called // right after awake(). // This is called immediately before the event loop goes to sleep. This // gives the user a chance to offer special instructions, like wake me after // 15 seconds. The default is to make no special requests. Do *not* hold // on to the callbacks object. virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { } // This gets called every time the thread wakes up. This will include // information from the pollset: which sockets are ready. (This might // include all sockets, not just the sockets that this user requested.) virtual void awake(std::set< int > const &woken) { } // If you call returnToMe(), it will call this method in the // ContainerThread. No one else calls this. (ForeverThreadUser::newRequest // calls returnToMe(), so in that case you will get responses here.) The // original request will be deleted by the caller after you return. virtual void handleRequestInThread(Request *original) { } // This gets called as soon as the thread sees the request to add this // user. This gets called before any other callbacks. Aside from this // promise it's impossible to predict which callbacks will be called in // which order. virtual void initializeInThread() { } // This returns a description of the container suitable for the logs. // The default is the unmangled name of the class and the this pointer // written in hex. You can overwrite this if you wanted to add more details. // // Sample default value: TopListMicroService::DataProvider(0x7f280c2858d0) // TopListMicroService::DataProvider does NOT override debugName(). // // Sample overriden value: NewWorkerCluster(TopListMicroService database) // NewWorkerCluster DOES override debugName(). There are a lot of // NewWorkerCluster objects, and they already have a _baseName used elsewhere // in the log. (In this example NewWorkerCluster::_baseName = // "TopListMicroService database".) virtual std::string debugName(); IContainerThread *getContainer() const { return _container; } // We prefer to work with a reference counted object. This simplifies // things like delivering a message to an object which might not exist // any more. typedef NCTSRefCount< IContainerThreadUser > Ref; // This takes a request object, delivers it to the right thread, and then // hands it back to me->handleRequestInThread(), and finally deletes the // original request. // // In many cases an object will register itself as a message listener. E.g. // if you want to receive commands from the CommandDispatcher. In this case // You implement your own newRequest() method, and all it does is passes // all requests to returnToMe(). // // That is to say, this is most helpful when someone else is already creating // the request objects, and you had to create a message id. If you are // are making your own callbacks, and creating your own Work / Request // objects, you can avoid the message id / switch statement. Your // Work::inThread() method can directly call private methods, or even // directly access private member variables. It would work like an anonymous // class in SwingUtilities.invokeLater() in Java. // // Better yet, use doWorkInThread() which will automatically create a Work // class for you. void returnToMe(Ref me, Request *original) const; // Normally when you call returnToMe() you expect someone else to // automatically delete the original request right after you get the callback // handleRequestInThread(). Call this if you don't want the original to be // deleted. It's up to you to manage the original request at that point. // Do not call keepOriginal() at any other time. void keepOriginal() const; // This is a wrapper around IContainerThread::Work. This makes it easy // to use a lambda expression for Work::inThread(). Action could be a // function or an object that implements operator(), but the normal way // to use this is if Action is a lambda. This provides an alternative to // returnToMe(). Use returnToMe if you already have a Request class. // use doWorkInThread() if you don't want to create a new subclass of // Request. // // Note: This does NOT do anything to keep the IContainerThreadUser object // alive. If you call doWorkInThread(), you should take care of that. // Inheriting from ForeverThreadUser or ForeverServerConnection (rather than // directly from IContainerThreadUser) is one way of doing that. Another // option would be for the Action to hold a Ref to the IContainerThreadUser // object. template < class Action > void doWorkInThread(Action action); // Just like above, but associate the request with a socket. The normal // rules apply. If the socket is deleted, this request will automatically // be deleted. template < class Action > void doWorkInThread(SocketInfo *socket, Action action); // invokeIfRequired() and doWorkInThread() will both make sure that the // given action is performed in the thread associated with this object. // doWorkInThread() always pushes the action onto the event queue. // invokeIfRequired() will perform the action immediately if we are already // in the appropriate thread, otherwise it will use the event queue. // // The name "invokeIfRequired" was copied from the C# code. Perhaps this // choice could have been a parameter to the function, rather than a separate // function. That's a little big tricky since (a) we have multiple optional // arguments, (b) one argument is a templated type, (c) one is a bool and // almost anything can get converted into a bool, and (d) these options were // added after I already had working code. // // Putting the optional socket second and the non-optional action first makes // more sense. We do that in other places, including addLambdaToQueue(). // It was probably a mistake to put the optional argument first in // doWorkInThread() but it's hard to change that now. // // invokeIfRequired() should probably be the default because it is more // efficient. Sometimes it is helpful to put something into the queue for // the current thread. For example if you want to yield to other tasks, but // execute again soon. However, that second case seems rare. template < class Action > void invokeIfRequired(Action action, SocketInfo *socket = NULL); // Returns true if the current thread in this user's container thread. bool inMyThread() const; }; /* This interface allows the thread user to access the thread. * * Currently there is only one implemention, ContainerThread. For that reason * all of the virtual methods are abstract. (No point in saying that some * code might be shared by multiple subclasses.) This inteface exists only * so we can hide a lot of implementation details of ContainerThread. */ class IContainerThread { public: // This is thread safe. It will send a message to the thread, so the result // will not be immediate. This will start consulting the user each time // the thread goes to sleep or wakes up again. // // This will cause an initializeInThread() message to be sent to the user. // // A duplicate call to addUser() will do nothing. virtual void addUser(IContainerThreadUser::Ref user) =0; // This is thread safe. It will send a message to the thread, so the result // will not be immediate. This will stop consulting the user. This will // remove the reference count to th user. This uses the same message queue // as addUser() and addToQueue(), so the order of these messages will be // preserved. Duplicate called to removeUser() will not do anything. virtual void removeUser(IContainerThreadUser::Ref user) =0; // When you want to send a message to a thread user, create a subclass of // Work and use addToQueue() to submit that job. // IContainerThreadUser::doWorkInThread() will create one of these for you // automatically. class Work : public Request { private: Request *_original; public: // You can optionally associate a socket with the Request. This has the // normal meaning. If that socket is closed while this request is in // the queue, the request will automatically be deleted. Work(SocketInfo *socket = NULL) : Request(socket), _original(NULL) { } // You can optionally associate another Request with this Request. // I.e. You listen for external commands directly in the CommandDispatcher // thread. (I.e. you implement RequestListener directly, without using // a RequestQueue.) You wrap the request from the CommandDispatcher in a // Work object, then you submit the Work object to the IContainerThread. // This will automatically copy the socket from the original Request to // this Request. This will automatically delete the original Request // when this Request is deleted. Work(Request *original) : Request(original->getSocketInfo()), _original(original) { } // Note that these are public. That keeps up our idea that Request objects // should be very simple, mostly just carrying data. The thread user can // call getOriginal() and/or clearOriginal() without any work from the // Work subclass. Request *getOriginal() const { return _original; } // Set the original Request to NULL. So when this object is deleted, it // will not try to delete the original. You probably want the original // to be deleted automatically if this object is automatically deleted // while it's waiting in the queue. However, once this object is // processed, the original Request might get *moved* somewhere else. void clearOriginal() { _original = NULL; } // This is called in the container thread. Notice that there is no // more routing information. Presumably this object will contain a // IContainerThreadUser::Ref pointing to the final recipient. Presumably // this function will dynamic_cast the pointer to the right type, then // use a simple function call to deliver its payload. virtual void inThread() =0; virtual ~Work() { delete _original; } }; virtual void addToQueue(Work *work, bool autoDelete = true) =0; // SpecificWork is a convenient way to convert a lambda into an // IContainerThread::Work object. template < class Action > class SpecificWork : public Work { private: Action const _action; public: SpecificWork(Action action, SocketInfo *socket = NULL) : Work(socket), _action(action) { } virtual void inThread() { _action(); } }; template < class Action > void addLambdaToQueue(Action action, SocketInfo *socket = NULL) { addToQueue(new SpecificWork< Action >(action, socket)); } virtual std::string getThreadName() const =0; // This is thread safe. The first call will create the thread. Many users // might share this. This is aimed at fast things that don't need a whole // thread. For example, if you listen to a TCP/IP connection, and all you // do when you receive data is send it to another thread. static IContainerThread *primary(); // This is thread safe. The first call will create the thread. Many users // might share this. This is aimed at batch processing and periodic // maintenance. Jobs might take longer than jobs in the primary() thread. // In particular, this thread can access the database. But most of the time // the thread will be inactive. We might, for example, want to check for // changes to the alerts daily table. If we do that once every two hours, // does it matter if we do it at 2pm, at 2:01pm, at 2:00:01pm, or even 3pm? static IContainerThread *batch(); // Create a new thread. It's up to us how much to share it. Container // threads were originally made for things which didn't need a thread of // their own and wanted to share. But they also work well with a single // user. If you plan to use the database a lot, you probably shouldn't // share too much. static IContainerThread *create(std::string name, bool makeNameUnique = false, int queueLogPeriod = 100); virtual ~IContainerThread() { } // This will return null if we are not in a container thread. // // An IContainerThreadUser object should know it's own thread. But this can // be handy if you send a work request to an IContainerThread without // creating an IContainerThreadUser in that thread. Basically you're using // the IContainerThread like a normal thread pool thread. static IContainerThread *current(); static std::string currentThreadName() { IContainerThread *const c = current(); if (c) return c->getThreadName(); else return "NULL"; } }; /* This takes care of a little more boilerplate stuff. As I create several * IContainerThreadUser subclasses, I notice some common features. * * I really liked the idea of a thread user that could be deleted. And I'm * keeping that as an option. But most can't be deleted for other reasons. * (If you subscribe to external commands, for example, there is * no way to unsubscribe.) We can simplify some things if we assume a thread * user will never go away. * * This isn't anything big, but it does get rid of the last of the boilerplate * code. */ class ForeverThreadUser : public IContainerThreadUser, public RequestListener { protected: void start() { getContainer()->addUser(_keepMeAlive); } // The beforeSleep() call is a good place to do deferred work. For example, // you received three seperate requests to send things to the user. You // collected those requests so you could package them together, possibly // removing duplicates and reducing overhead. beforeSleep() is the place to // finally gather the deferred work and do it. // // This worked fine in simple cases. Originally each IContainerThreadUser // object was fairly seperate, so if two were in the same thread it didn't // matter what order their beforeSleep() calls were made. // // Now we often use IContainerThreadUser as a base for libraries that share // the same thread as their callers, which are also IContainerThreadUser // objects. What if a's beforeSleep() called b and asked b to do some work, // but b decided to defer the work until b's beforeSleep() call? How do we // know b will get a beforeSleep() call any time soon? Maybe b got its // beforeSleep() callback shortly before a got its beforeSleep() callback. // // requestBeforeSleep() solves that problem. If you defer some work, and // you want to get a beforeSleep() soon, call requestBeforeSleep(). It // is safe and efficient to call requestBeforeSleep() multiple times. void requestBeforeSleep(); public: // The Listener class is an adapter. It was made as an easier way for a // ForeverThreadUser to request and handle callbacks from the // CommandDispatcher. However, this class can handle any data source // which takes a RequestListener as an input. The alternative to this // class would be a large switch statement in the handleRequestInThread() // method. // // T is the type of request you will be receiving. This helps get rid of // some standard boilerplate code. Normally the first thing you do with a // new request is cast it to the correct type, and the second thing you // do is check that the cast succeeded. template < class T > // static_assert(T implements Request); class Listener : public RequestListener { private: // When we receive a message we forward it to this _thread... IContainerThread &_thread; // ...then call this _callback. const std::function< void(TSRefCount const &) > _callback; virtual void newRequest(Request *request) { TSRefCount r = dynamic_cast< T * >(request); SocketInfo *const socket = r->getSocketInfo(); _thread.addLambdaToQueue([=](){ _callback(r); }, socket); } // Assume the Listener will exist forever. That seems reasonable since // you can't unsubscribe from the CommandDispatcher and a ForeverThreadUser // is meant to exist forever. Just saying this explicitly means there's // a lot we don't have to think about. virtual ~Listener() { abort(); } public: // When we receive a message, switch to the appropriate thread then // call the given callback. // // The input to the callback is a thread safe reference counted pointer // to the original message. The callback can choose to keep the pointer // even after returning. This replaces IContainerThread::keepOriginal() in // our newer code and "current = NULL" in older code that implemented its // own event loop. The older examples are clearly ugly where this new // interface is exactly what smart pointers were made for. Listener(IContainerThread *thread, std::function< void(TSRefCount const &) > callback) : _thread(*thread), _callback(callback) { } // When we receive a message, switch to the appropriate thread then // call the given callback. // // The input to the callback is a constant reference to the original // message. The message will automatically be deleted by this library // immediately after the callback finishes. This constructor is less // powerful than the previous constructor, but may be easier to use. Listener(IContainerThread *thread, std::function< void(T const &) > callback) : _thread(*thread), _callback([=](TSRefCount const &message) { callback(*message); }) { } }; const Ref _keepMeAlive; // For RequestListener. virtual void newRequest(Request *request) { returnToMe(_keepMeAlive, request); } ForeverThreadUser(IContainerThread *container = NULL) : IContainerThreadUser(container?container:IContainerThread::primary()), _keepMeAlive(this) { } }; template < class Action > void IContainerThreadUser::doWorkInThread(Action action) { getContainer()->addLambdaToQueue(action); } template < class Action > void IContainerThreadUser::doWorkInThread(SocketInfo *socket, Action action) { getContainer()->addLambdaToQueue(action, socket); } template < class Action > void IContainerThreadUser::invokeIfRequired(Action action, SocketInfo *socket) { if (inMyThread()) action(); else doWorkInThread(socket, action); } #endif