#ifndef __GridWorkerThread_h_ #define __GridWorkerThread_h_ #include #include "../shared/ThreadClass.h" #include "../shared/Messages.h" #include "GridFiller.h" #include "ExecutionContext.h" class GridWorkerThread : private ThreadClass { public: class GridWorkerJob : public Request { public: virtual void inWorkerThread(ExecutionContext &executionContext) =0; // This is called after inWorkerThread(), in the worker thread. The // implementer has two choices. Add itself to another queue and return // true, or return false and let the worker thread delete the object. // A normal job send through GridWorkerCluster::addWork() should always // return itself to the dispatcher so the GridWorkerCluster knows that // that the thread is available again. A job sent to // GridWorkerCluster::getAllThreads() should return false from this. virtual bool returnHome() =0; virtual void afterReturn() =0; // The default will throw a TCL exception. virtual int tclCmd(ExecutionContext &executionContext, int objc, Tcl_Obj *const objv[]); GridWorkerJob(SocketInfo *socket = NULL) : Request(socket) { } }; protected: void threadFunction(); private: enum { mtSubmit, mtQuit }; RequestQueue _incoming; GridFiller _gridFiller; ExecutionContext *_executionContext; GridWorkerJob *_currentJob; static int currentJobCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]); int currentJobCmd(int objc, Tcl_Obj *const objv[]); public: GridWorkerThread(); ~GridWorkerThread(); void submit(GridWorkerJob *gridWorkerJob); }; class GridWorkerCluster : NoCopy, NoAssign { private: FairRequestQueue _jobs; std::queue< GridWorkerThread * > _available; std::vector< GridWorkerThread * > _allThreads; // _working maps the request back to the thread. The request might be // deleted by another thread. That's why we cast the request to void *, // so you're not tempted to use it. We also store the socket associated // with the request. That's required to delete the info from this table, // and to mark the corresponding thread as free, if a socket closes. struct PendingInfo { SocketInfo *socketInfo; GridWorkerThread *thread; }; typedef std::map< void *, PendingInfo > Working; Working _working; public: GridWorkerCluster(int workerCount); // Call this to add a job to the queue. void addWork(GridWorkerThread::GridWorkerJob *job); // Call this when a job returns to this thread from the worker. void returning(Request *job); // If any work is in the queue, and any workers are available, send the work // to the worker. void match(); // Efficiently removes any and all requests associated with the socket. // After removing a request, you cannot call returning() on that request. // Presumably you're calling remove() because the socket is being deleted, // so this is not a problem. void remove(SocketInfo *socket); // Return a list of all worker threads. It is sometimes useful to send a // message directly to the threads to initialize something. For example, // to send a procedure definition to all threads. std::vector< GridWorkerThread * > const &getAllThreads() const { return _allThreads; } }; #endif