#ifndef __NewWorkerCluster_h_ #define __NewWorkerCluster_h_ #include "ThreadMonitor.h" #include "ContainerThread.h" #include "SmarterP.h" /* This class is responsible for sending work to N other threads. It listens * for responses so it knows when the worker threads are free and can be used * for more work. * * Items are always pushed to the worker threads. That gives us the most * control. (At one time I tried a worker cluster with several threads sharing * one event queue. That's a more traditional arrangement. While it worked * most of the time, that setup didn't let me distribute the socket closed * messages to all the worker threads. That could cause a core dump or similar * problem.) * * This is a ForeverThreadUser so it can directly listen for responses from * other threads and for socket closed messages. This object should only be * accessed from other code in the same container thread. This will create * other container threads to serve as workers. * * This has a lot fewer assumptions than ../ax_alert_server/WorkerThread.h. * That file is very concerned with databases. This class lets the caller * do any initialization it needs, including creating a database connection or * other resource in a thread local variable. This does not contain the * ability to cancel an item. If we need that, we should probably rethink it. * the WorkerThread.h version allowed you to use a pointer that had been * deleted and that seems to be asking for trouble. This does not contain the * ability to say that different workers can handle different jobs. We don't * need this now, but it shouldn't be hard to add if we ever needed it. * * TimedJobList.h / TopListWorkers.C makes an attempt at something similar. * It's not very reusable. And it doesn't contain a FairRequestQueue; we've * had problems where a client gone crazy can make everyone else slow. That * also has to deal with two levels of sockets; we're trying to avoid that in * general. * * TopListWorkers.C had other issues. For simplicity you could not send a * message back to the main thread. So we took some short cuts. Like always * scheduling the next invocation of the top list right before sending the * first one off to the thread. And we only had one database thread. It * would have been complicated to add a cluster of those. */ class NewWorkerCluster : public ForeverThreadUser, private ThreadMonitor::Extra { public: /* JobInfo is how you specify the job you want to run. There are a lot of * options. Originally we had a series of addJob...() functions including * optional arguments and a lot of templates. Now you can specify each * option by name, and the compiler checks the types of the lambdas * immediately. */ class JobInfo { public: JobInfo(); // The normal meaning. Jobs created from this JobInfo will be canceled // abruptly if the socket closes. socket defaults to NULL. SocketInfo *socket; // Called in the dispatcher thread. One last chance to access your data // in this thread before going to the worker. Return true to continue. // Return false if you changed your mind and don't need to send anything. // Defaults to doing nothing and returning true. std::function< bool() > beforeDispatch; // Called in the worker thread. This is the main point of the thread // pool. Defaults to throwing an exception. std::function< void() > inThread; // Called in the dispatcher thread, after finishing in the worker thread. // Defaults to doing nothing. This is a good place to update a data // structure stored in the dispatcher thread with data computed in the // worker thread. std::function< void() > backHome; // Only called from the dispatcher thread. // Typically called when a thread is running too long and we want to report // the details to the log file. // // Note: You can expect this to be called in the dispatcher thread while // inThread() is being called in the worker thread. Other possibilities // exist. Perhaps the request was deleted in the worker thread while the // dispatcher was calling description(). If these functions share data, // a std::shared_ptr in each lambda expression could help. // // The default returns the empty string. std::function< std::string() > description; // These get copied around a lot, so why not wrap them in a preferred // smart pointer. typedef SmarterCP< JobInfo > CRef; // Normal rules apply. Create as a non-const object so you can configure // it. As soon as you share the object, never modify it again. class Ref : public SmarterP< JobInfo > { public: // A convenience. So you can chain these. Ref &setSocket(SocketInfo *v) { (*this)->socket = v; return *this; } Ref &setBeforeDispatch(std::function< bool() > const &v) { (*this)->beforeDispatch = v; return *this; } Ref &setInThread(std::function< void() > const &v) { (*this)->inThread = v; return *this; } Ref &setBackHome(std::function< void() > const &v) { (*this)->backHome = v; return *this; } Ref &setDescription(std::function< std::string() > const &v) { (*this)->description = v; return *this; } // Create a new default object, rather than starting with NULL. Ref() : SmarterP< JobInfo >(NULL) { } // If you just want to use a string, this will create a function that // will return your string. It would be simpler if we only got strings. // But the assumption is that some of these strings will be expensive to // compute so we defer it. void setDescription(std::string const &v); }; }; private: // For ThreadMonitor::Extra virtual std::string getInfoForThreadMonitor(); // A wrapper around JobInfo::Ref that I can send to a FairRequestQueue. class Job : public Request { public: JobInfo::CRef info; Job(SocketInfo *socket = NULL) : Request(socket) { } Job(JobInfo::CRef const &info) : Request(info->socket), info(info) { } }; std::vector< IContainerThread * > _allWorkers; // All workers, busy or not. std::queue< IContainerThread * > _availableWorkers; struct BusyWorkerInfo { int64_t startTime; bool hasBeenReported; bool pingAfterSocketClosed; JobInfo::CRef info; }; std::map< IContainerThread *, BusyWorkerInfo > _busyWorkers; FairRequestQueue _jobs; // These are used when creating new threads. std::string _baseName; int _logPeriod; // For IContainerThreadUser virtual void socketClosed(SocketInfo *socket) override; virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) override; virtual void initializeInThread() override; virtual std::string debugName() override; void match(); void doJobInThread(JobInfo::CRef const &job); void doJobReturn(JobInfo::CRef const &job, IContainerThread *worker); void reportRequestEnded(IContainerThread *worker, BusyWorkerInfo const &workerInfo, bool socketClosed) const; // Create a a new worker thread and add it to the pool. This is the most // powerful way to add a new thread. Most people will use createWorkers() // or createWorkersLambda(). This function could be made public, but we // want to make the other two functions more obvious, so someone doesn't // call this function by mistake. // // initializer will be called in the new thread, shortly after it is created, // before any jobs are sent to it. initializer may be NULL. // // name is the base name of the thread. We always ask ContainerThread // to make this name unique. // // logPeriod is passed on to the RequestQueue object owned by this thread. // Set this to a higher value to see fewer backlog messages in the log file. void createWorker(IContainerThread::Work *initializer, std::string name, int logPeriod = 100); public: // container is the thread that will access this object. Callbacks will come // back in that same thread. The default for ForeverThreadUser is to create // new thread; that never makes sense for this class. getContainer() would // be a reasonable input for this constructor. IContainerThread::current() // is probably a bad idea, because the constructor for an // IContainerThreadUser is often called in a different thread than that // object runs in. // // baseName will be used to create the names of the individual threads. // We automatically make the thread names unique. // // logPeriod is passed on to the RequestQueue object owned by each new // thread. Set this to a higher value to see fewer backlog messages in the // log file. NewWorkerCluster(IContainerThread *container, std::string const &baseName, int logPeriod = 100); // Create this many workers with no initializer. void createWorkers(int count); int pendingJobCount(SocketInfo *socket) const { return _jobs.getCount(socket); } // action should be a lambda that takes no inputs. action will be called in // the new thread, shortly after it is created, before any jobs are sent to // it. template < class Action > void createWorkersLambda(Action action, int count) { for (; count > 0; count--) createWorker(new IContainerThread::SpecificWork< Action >(action), _baseName, _logPeriod); } int workerCount() const { return _allWorkers.size(); } void addJob(JobInfo::CRef const &jobInfo); // Create a job. Perform the given action in the woker thread. Do not do // any work after that. void addJob1(std::function< void() > const &action, SocketInfo *socket = NULL) { addJob(JobInfo::Ref().setInThread(action).setSocket(socket)); } // This is similar to addJob1(). remoteAction gets queued up then run in an // appropriate thread. // // addJob1() and most of the variants of addJob...() all require you to call // them from the correct thread. This function can be called from any // thread. generally the addJob1() style has worked well for us, but this // function is useful sometimes. This function looks more like the thread // pools you see in C#, Java, etc., than our normal thread pool. // // socket has its normal meaning. A job is automatically deleted if the // corresponding socket closes. This can be NULL if you don't need this // behavior. In some of these functions socket is NULL by default. In // general I've had trouble mixing templates, lambdas, and optional // arguments. That makes it hard for the compiler to generate good error // messages. template < class RemoteAction > void addJobFromAnywhere(SocketInfo *socket, RemoteAction remoteAction) { invokeIfRequired([=]() { addJob1(remoteAction, socket); }); } // Create a JobInfo. Perform the first action in the worker thread. Then // perform the second action in the original thread. // This function is semi-deprecated. Originally this was complicated, but // now that we have the JobInfo class, you might as well use that directly. void addJob2(std::function< void() > const &remoteAction, std::function< void() > const &returnAction, SocketInfo *socket = NULL) { addJob(JobInfo::Ref().setInThread(remoteAction) .setBackHome(returnAction).setSocket(socket)); } /* addJobPass() is deprecated. Just create your own reference * counted pointer and give a copy to whichever lambdas need them. * Create your own JobInfo and fill it in as you like. * * addJobPass() is similar to addJob2() but it allows the first lambda * expression (the remote action) to pass a value to the second lambda * expression (the return action). The first lambda must return a value * of a specific type and the second lambda must take a value of the same * type as an input. * * Sample code: * struct InitialStates * { * DataProvider::InitialState::Ref live; * DataProvider::InitialState::Ref delayed; * }; * databaseJobs.addJobPass< InitialStates > * ([=] () -> InitialStates * { // Running in the remote thread. * InitialStates result; * // Do some work and store something in result. * return result * }, [=] (InitialStates &initialStates) { * // Running in the dispatcher thread. * // Do something with initialStates. * }); * * This sample was extracted from ../fast_alert_search/TopListMicroService.C. * * Notice that we explicitly tell addJobPass() the type for Passed. The * complier is not smart enough to detect this on its own. * * Passed can be any type. It could be something simple like a bool to say * success or failure. Here it's a struct because we wanted to pass multiple * pieces of data from the first lambda to the second. */ template < class Passed > void addJobPass(std::function< Passed() > const &remoteAction, std::function< void(Passed &) const &> returnAction, SocketInfo *socket = NULL) { SmarterP< Passed > passed; JobInfo::Ref jobInfo; // Implicit assumption: Passed has a copy constructor. A previous // version of this function had different assumptions, Passed has a default // constructor and an assignment operator. These hidden details are one // of the reasons I deprecated this function. Just do it yourself. jobInfo.setInThread([=]() { passed.emplace(remoteAction()); }); jobInfo.setBackHome([=]() { returnAction(*passed); }); jobInfo.setSocket(socket); addJob(jobInfo); } /* addJobPre() is deprecated. Just create your own reference * counted pointer and give a copy to whichever lambdas need them. * Create your own JobInfo and fill it in as you like. * * addJobPre() is like addJob2() with two differences. * * 1) PreAction is called immediately before the job is sent from the * dispatcher thread to the remote thread. This gives you one last * chance to copy non-thread-safe data from the dispatcher thread for * use in the remote thread. This lambda also has an option to abort the * current job before sending the job to another thread. * * 2) The job object contains a field of type Passed. This field is * accessible to all three lambdas, but no where else. This is similar * in functionality to addJobPass(), but the syntax is a little different. * * PreAction should return a boolean. True means to continue as normal and * send the job to the worker thread. False means to abort this job. The * NewWorkerCluster will automatically grab the next job from it's queue. */ template < class Passed > void addJobPre(std::function< bool(Passed &) > const &preAction, std::function< void(Passed &) > const &remoteAction, std::function< void(Passed &) > const &returnAction, SocketInfo *socket = NULL/*, Passed const &initializer = Passed()*/) { SmarterP< Passed > passed(NULL); JobInfo::Ref jobInfo; jobInfo.setBeforeDispatch([=]() { return preAction(*passed); }); jobInfo.setInThread([=]() { remoteAction(*passed); }); jobInfo.setBackHome([=]() { returnAction(*passed); }); jobInfo.setSocket(socket); addJob(jobInfo); } // Sends work remoteAction to all of the threads. // // This bypasses the main queue used by addJob1() and addJob2(). This sends // the work directly to each thread's queue. // // Imagine you send a job this way, then you send a second job using // addJob1() or addJob2(). Some thread will eventually receive the second // job. It will not receive that job until after it completed the first job. // // If you send several jobs this way, each thread will process those jobs in // the order that you send them. // // There isn't much more I can say about order. If you send a job with // addJob1() or addJob2(), then you send a second job with addJobAll(), the // first job might or might not start or complete before the second job starts // or completes. // // This was originally aimed at the dispatch_tcl_all command in the tikiller // project. That lets the user send initialization type commands to all // threads at any time. For example, to reread a script in each thread. // You read as part of initialization, but you want to use the newest version // of the script without restarting the entire app. template < class RemoteAction > void addJobAll(RemoteAction remoteAction, SocketInfo *socket = NULL) { for (auto it = _allWorkers.begin(); it != _allWorkers.end(); it++) (*it)->addLambdaToQueue(remoteAction, socket); } std::string debugDump() const; }; #endif