#ifndef __WorkerThread_h_ #define __WorkerThread_h_ /* This unit provides a generic way to split up work into mutliple symmetric * threads. The idea comes from the UserRequestControl.C / * StreamingAlertsThread.C code. Originally one thread listened to a lot of * different types of events and did all the work itself. However this did * not scale well. So now there is one thread which is still in charge. And * there are several more threads which will each take one request at a time * and respond back the the main thread. The bulk of the (clock) time that * each helper thread spends on the request is spend on database requests. * * The problem with this setup is that the idea of splitting up work into * multiple threads is tightly mixed in with the specific task which is * being preformed. This unit provides the code to help any other unit break * some of its work up into smaller threads. The only assumption is that the * other threads will use one or more database connections, but even this is * not required. * * This unit was first used by OddsMaker.C. However, HistoryHandler.C followed * soon after. Part of this was to allow for more history requests at once. * We have more than enough database capacity to handle that. But the bigger * issues was maximizing the cache use of each database machine. History and * the OddsMaker both use the same rules, so they send similar requests to * each machine. * * Notice ../fast_alert_search/NewWorkerCluster.h. That file does a lot of the * same work, but in a more generic way. It doesn't know anything about * databases. It can be customized to handle different types of resources. */ #include #include #include #include "../shared/ThreadClass.h" #include "../shared/Messages.h" #include "../shared/DatabaseWithRetry.h" #include "Types.h" // We use the day number to choose which database server gets a request. // The alert_shards table helps you find the appropriate day number for a // request. typedef uint64_t DayNumber; // This can be a don't know or a don't care. In particular, when you first // start the process, you often need to bootstrap yourself, and find some // general information that will help you select the right shard. This will // typically be a small request, so any database server can handle it. #define DAY_NUMBER_UNKNOWN ((DayNumber)0) class ShardList : NoAssign { public: struct Shard { DayNumber day; std::string date; std::string table; AlertId minId; AlertId maxId; TclList dump() const; }; private: std::map< std::string, Shard > _byDate; std::map< AlertId, Shard const * > _byMaxId; void updateByMaxId(); Shard const *assertGood(Shard const *shard) const; public: ShardList() { } ShardList(ShardList const &o) : _byDate(o._byDate) { updateByMaxId(); } Shard const *getOldest() const; // Null if list is empty. Shard const *getNewest() const; // Null if list is empty. Shard const *findAlert(AlertId id) const; // Null if we can't find it. // If the id exists in our table, return the corresponding shard, just like // findAlert(). If that exact id does not exist, look for the largest alert // id less that this, one, and find it. Return null if the request id is // less than any we know about. This was specifically aimed at history, // which walks through the database backwards, looking for more alerts. // Note: The shard list only records the first and last id of each shard. // if some ids are missing in the middle of a shard, findAlert() doesn't know // or care. findAlertOrBefore() becomes important when there is a missing // id between two shards. Shard const *findAlertOrBefore(AlertId id) const; Shard const *findDay(std::string const &date) const; // Null if we can't... // If date exists in our table, return the correspoding shard, just like // findDay(). If date does not exist, look for the next greater (later) // day and return that. If date is greater than the date of any shard, // return null. So looking for Saturday would probably give you the // following Monday. Shard const *findDayOrGreater(std::string const &date) const; AlertId getMinId() const; AlertId getMaxId() const; void load(DatabaseWithRetry &database); TclList dump() const; void sendShardToLog(AlertId id) const {} }; class WorkerThreadRequest; class WorkerThread : private ThreadClass { private: enum { mtNewRequest, mtQuit }; RequestQueue _incoming; DatabaseWithRetry *_readOnlyDatabase; DatabaseWithRetry *_masterDatabase; DatabaseWithRetry *_readOnlyBarsDatabase; const int _returnMessageId; RequestListener * const _returnListener; protected: void threadFunction(); public: std::string getDebugName(); // These are used by the request. Instead of passing these to the request, // we pass a pointer to ourself. That way we can add more pieces of data // over time without changing the interface between the thread and the // request. DatabaseWithRetry *getReadOnlyDatabase() { return _readOnlyDatabase; } DatabaseWithRetry *getMasterDatabase() { return _masterDatabase; } DatabaseWithRetry *getReadOnlyBarsDatabase() { return _readOnlyBarsDatabase; } // This should be set once, at most. We do not set it in the constructor // for simplicity and flexibility. Someone might make multiple passes // at filling in this data. And as new fields are added, the old code won't // have to change. void setMasterDatabase(DatabaseWithRetry *value); void setReadOnlyDatabase(DatabaseWithRetry *value); void setReadOnlyBarsDatabase(DatabaseWithRetry *value); void add(WorkerThreadRequest *request); std::string getName() { return ThreadClass::getName(); } WorkerThread(std::string baseName, int returnMessageId, RequestListener *returnListener); ~WorkerThread(); }; class WorkerThreadRequest : public Request { private: DayNumber _dayNumber; protected: void setDayNumber(DayNumber dayNumber) { _dayNumber = dayNumber; } public: virtual void doWork(WorkerThread *resources)=0; WorkerThreadRequest(SocketInfo *socketInfo) : Request(socketInfo), _dayNumber(DAY_NUMBER_UNKNOWN) {} // Give the controller an idea of which alert data you are going to need. // We try route similar requests to the same databases over and over to // improve caching. This might change over time. DayNumber getDayNumber() const { return _dayNumber; } // If you override this the responses will go to the specified listener // and will have the specified message id. Otherwise we will use the // defaults from the WorkerThread. virtual RequestListener * getReturnListener() { return NULL; } // This is only called if getReturnListener() does not return NULL. virtual int getReturnMessageId() { return 0; } }; class WorkerThreadQueue { private: // We always use a FIFO queue. A lot of data structures could work (such as // a stack) because each of the items in the queue should be functionally // equivilant. However, if there is a problem, a queue will let us find it // in a predictable way. For example, if there was one bad worker thread, // add we stored the free ones in a stack, we might never test the one // bad one. We wouldn't see it until the system was under stress and we // finally used all of them at once. A FIFO is just the opposite. It will // ensure that we test each item quickly. // // Note: The worker thread queue may be obsolete. In the current usage, // each stripe has one. And the number of workers per stripe can be set in // the config file. But in practice, it is always 1. At some point we may // completely remove this class. // // We sometimes use multiple threads per stripe for load testing. So maybe // this does have a purpose. Multiple threads per stripe gives us a decent // simulation of several machines all configured the same way with 1 thread // per stripe. std::queue< WorkerThread * > _available; struct OutstandingRequest { WorkerThread *workerThread; // We have to keep track of aborted here. If the request contained an // aborted flag, we might try to abort a request after another // thread had already deleted the request. If we tried to keep track of // this info elsewhere, we'd have to duplicate the data structure below. bool aborted; // Socket is similar. This is already stored in the request. However, // we are looking at this list when the request is owned by another // thread and it might have already been deleted. SocketInfo *socket; }; // Invariant: All threads are either in _available or _outstanding, but // not both. std::map< WorkerThreadRequest *, OutstandingRequest > _outstanding; public: void addWorker(WorkerThread *t) { _available.push(t); } ~WorkerThreadQueue(); int available() const; // Returns true on success. Returns false if no workers were available. bool send(WorkerThreadRequest *request); // If a socket is closed then the worker thread or one of the queues will // delete the request. So we won't get a response. So we have to know on // our own that the worker thread is now available. Use this function to // mark the worker threads as available. void release(SocketInfo *socket); // When we received a message back from the worker thread, use this function // to mark the thread as available. Returns true if the item has already // been aborted. The request MUST be in the outstanding queue. bool release(WorkerThreadRequest *request); // Similar to above, but the request is not required to be in the queue. // found is false (and aborted is undefined) if the request was not in the // this queue. aborted is true if the request was found and was already // aborted. void release(WorkerThreadRequest *request, bool &found, bool &aborted); // The request does not have to be in here. If it is we record that, and // report it on the call to release. void abort(WorkerThreadRequest *request); // Note: After a request has already been dispatched to the worker thread // (that is to say, if you called send() and it succeeded) then there are // several possibilities. // 1) A response comes back to the original thread. It calls release() on // the and request sees that aborted is false. It processes the request. // 2) The original thread calls release() on the socket. The system makes // a similar announcement to the worker thread. The request gets deleted // by one of the queues. The original thread does not see the request // again. // 3) The original thread no longer cares about this request. It calls // abort(). The original thread will get a response when this request // is complete. The orignal thread will call release() on the request // and see that it was aborted. It will delete that request, without // doing any more processing on it // 4) An abort request from the original thread can overlap with a delete // socket request from the system. This class is set up to handle that // properly. The original thread does not have to do anything special. // // Perhaps the names could use some clean up. Calling release() on a socket // or calling abort() on a request says that we don't want to see the result; // cancel the request at the next convenient time. Calling release() on a // request acknowledges that the work completed on its own. Yuck! }; class WorkerCluster : private ThreadMonitor::Extra { private: class WorkerStripe : NoCopy, NoAssign, public FairRequestQueue::Predicate { private: WorkerThreadQueue _workers; unsigned int _divisor, _remainder; bool _acceptUnknown; std::string _debugName; bool matches(DayNumber day) const; bool matches(WorkerThreadRequest const *request) const; bool acceptable(WorkerThreadRequest const *request) const; bool acceptable(Request const *request) const; public: WorkerStripe(unsigned int divisor, unsigned int remainder, bool acceptUnknown) : _divisor(divisor), _remainder(remainder), _acceptUnknown(acceptUnknown) {} void addWorker(WorkerThread *t) { _workers.addWorker(t); } bool doWork(WorkerThreadRequest *request); // True if work was accepted. void remove(SocketInfo *socket); void remove(WorkerThreadRequest *toDelete); void release(WorkerThreadRequest *request, bool &found, bool &aborted); void matchAll(FairRequestQueue &pending); void setDebugName(std::string v) { _debugName = v; } std::string const &getDebugName() const { return _debugName; } int workersAvailable() const { return _workers.available(); } }; typedef std::vector< WorkerStripe * > StripeSet; StripeSet _stripes; FairRequestQueue _pending; virtual std::string getInfoForThreadMonitor(); protected: WorkerStripe *addStripe(unsigned int divisor, unsigned int remainder, bool accpetUnknown); public: WorkerCluster(std::string configItemName, std::string baseName, int returnMessageId, RequestListener *returnListener); ~WorkerCluster(); void addWork(WorkerThreadRequest *request); void remove(SocketInfo *socket); void remove(WorkerThreadRequest *toDelete); // When we received a message back from the worker thread, use this function // to mark the thread as available. This returns true if the request was // aborted. Aborted means that the calling thread asked to remove() the // request. // // Note: If you call remove, the request might be sitting in our queue, in // which case we can delete it immediately and you never see it again. // However, it might have already been dispatched to the worker thread. In // that case you will get a reply from the worker thread. You have to use // this function to know that you should ignore the result. // // As always, if you receive a message from a worker thread, it is your // responsibility to delete the message. If this class deleted the message, // you will not receive a pointer to the message in your queue again. // // Only call this on a message which has just returned from a worker thread. bool acknowledge(WorkerThreadRequest *request); void matchAll(); }; #endif