#include #include #include #include "../shared/GlobalConfigFile.h" #include "../shared/SimpleLogFile.h" #include "WorkerThread.h" ///////////////////////////////////////////////////////////////////// // class ShardList::Shard ///////////////////////////////////////////////////////////////////// TclList ShardList::Shard::dump() const { TclList result; result<date); assert(shard == expected); return shard; } */ inline ShardList::Shard const *ShardList::assertGood(Shard const *shard) const { return shard; } TclList ShardList::dump() const { TclList result; for (std::map< std::string, Shard >::const_iterator it = _byDate.begin(); it != _byDate.end(); it++) result<second.dump(); return result; } /* void ShardList::sendShardToLog(AlertId id) const { std::string name = "Shard "; Shard const *shard = findAlert(id); if (shard) // On 4/30/2012 the next line caused an exception! // gdb said that shard was NULL. That is very strange. // I have seen a few other random exceptions. They don't appear to be // related, but I don't have current source code so I can't be sure. name += shard->date; else name += "NULL"; ThreadMonitor::find().increment(name); } */ ShardList::Shard const *ShardList::getOldest() const { if (_byDate.empty()) return NULL; return assertGood(&_byDate.begin()->second); } ShardList::Shard const *ShardList::getNewest() const { if (_byDate.empty()) return NULL; return assertGood(&_byDate.rbegin()->second); } ShardList::Shard const *ShardList::findAlert(AlertId id) const { std::map< AlertId, Shard const * >::const_iterator it = _byMaxId.lower_bound(id); if (it == _byMaxId.end()) // Id is too big. return NULL; if (id < it->second->minId) // Id is too small or is between two shards. return NULL; return assertGood(it->second); } ShardList::Shard const *ShardList::findAlertOrBefore(AlertId id) const { std::map< AlertId, Shard const * >::const_iterator it = _byMaxId.lower_bound(id); if (it == _byMaxId.end()) // Id is too big. Return null to suggest that the caller look at the live // alerts. This is pretty specific, but that's what History.C requires. return NULL; if (id < it->second->minId) { // Id is too small or is between two shards. Try the shard before this // one. it--; if (it == _byMaxId.end()) // Id is too small. It is not in any of our shards. return NULL; else // The id is between two shards. This shard has the highest id which // is less that (older than) the requested id. return assertGood(it->second); } // We found an exact match. return assertGood(it->second); } ShardList::Shard const *ShardList::findDay(std::string const &date) const { return assertGood(getProperty(_byDate, date)); } ShardList::Shard const * ShardList::findDayOrGreater(std::string const &date) const { std::map< std::string, Shard >::const_iterator it = _byDate.lower_bound(date); if (it == _byDate.end()) return NULL; else return assertGood(&(it->second)); } AlertId ShardList::getMinId() const { Shard const *oldest = getOldest(); if (!oldest) return std::numeric_limits< AlertId >::max(); return oldest->minId; } AlertId ShardList::getMaxId() const { Shard const *newest = getNewest(); if (!newest) return std::numeric_limits< AlertId >::min(); return newest->maxId; } void ShardList::updateByMaxId() { _byMaxId.clear(); for (std::map< std::string, Shard >::const_iterator it = _byDate.begin(); it != _byDate.end(); it++) { Shard const &s = it->second; _byMaxId[s.maxId] = &s; } } void ShardList::load(DatabaseWithRetry &database) { _byDate.clear(); for (MysqlResultRef result = database.tryQueryUntilSuccess("SELECT * FROM alert_shards WHERE live='Y'"); result->rowIsValid(); result->nextRow()) { const std::string date = result->getStringField("date"); Shard &s = _byDate[date]; s.day = result->getIntegerField("day", 0); s.date = date; s.table = result->getStringField("table_name"); s.minId = result->getIntegerField("first_id", std::numeric_limits< AlertId >::max()); s.maxId = result->getIntegerField("last_id", std::numeric_limits< AlertId >::min()); } updateByMaxId(); } ///////////////////////////////////////////////////////////////////// // class WorkerThread ///////////////////////////////////////////////////////////////////// std::string WorkerThread::getDebugName() { if (_readOnlyDatabase) { return getName() + " " + _readOnlyDatabase->getDatabaseServerName(); } else { return getName(); } } void WorkerThread::setReadOnlyDatabase(DatabaseWithRetry *value) { assert(!_readOnlyDatabase); _readOnlyDatabase = value; } void WorkerThread::setMasterDatabase(DatabaseWithRetry *value) { assert(!_masterDatabase); _masterDatabase = value; } void WorkerThread::setReadOnlyBarsDatabase(DatabaseWithRetry *value) { assert(!_readOnlyBarsDatabase); _readOnlyBarsDatabase = value; } //#include //#include void WorkerThread::add(WorkerThreadRequest *request) { //Request *r1=request; //WorkerThreadRequest *r2 = dynamic_cast(r1); //std::cout<<"adding: "<callbackId = mtNewRequest; _incoming.newRequest(request); } WorkerThread::WorkerThread(std::string baseName, int returnMessageId, RequestListener *returnListener) : ThreadClass(baseName + " " + pointerToString(this)), _incoming(getName()), _readOnlyDatabase(NULL), _masterDatabase(NULL), _readOnlyBarsDatabase(NULL), _returnMessageId(returnMessageId), _returnListener(returnListener) { startThread(); } WorkerThread::~WorkerThread() { Request *request = new Request(NULL); request->callbackId = mtQuit; _incoming.newRequest(request); waitForThread(); } //#include //#include void WorkerThread::threadFunction() { while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtNewRequest: { //std::cout<<"executing: "<(current); request->doWork(this); RequestListener * listener = request->getReturnListener(); if (listener) { // Specified by the request. request->callbackId = request->getReturnMessageId(); } else { // Use the default for this thread. listener = _returnListener; request->callbackId = _returnMessageId; } listener->newRequest(request); current = NULL; break; } case mtQuit: delete current; return; } delete current; } _incoming.waitForRequest(); } } ///////////////////////////////////////////////////////////////////// // class WorkerThreadQueue ///////////////////////////////////////////////////////////////////// WorkerThreadQueue::~WorkerThreadQueue() { while (!_available.empty()) { WorkerThread *workerThread = _available.front(); delete workerThread; _available.pop(); } for (std::map< WorkerThreadRequest *, OutstandingRequest >::iterator it = _outstanding.begin(); it != _outstanding.end(); it++) { delete it->second.workerThread; } } int WorkerThreadQueue::available() const { return _available.size(); } bool WorkerThreadQueue::send(WorkerThreadRequest *request) { assert (!_outstanding.count(request)); if (!available()) { return false; } else { OutstandingRequest r; r.workerThread = _available.front(); _available.pop(); r.aborted = false; r.socket = request->getSocketInfo(); _outstanding[request] = r; r.workerThread->add(request); return true; } } void WorkerThreadQueue::release(SocketInfo *socket) { std::map< WorkerThreadRequest *, OutstandingRequest >::iterator it = _outstanding.begin(); while (it != _outstanding.end()) { std::map< WorkerThreadRequest *, OutstandingRequest >::iterator next = it; next++; if (it->second.socket == socket) { release(it->first); } it = next; } } bool WorkerThreadQueue::release(WorkerThreadRequest *request) { std::map< WorkerThreadRequest *, OutstandingRequest >::iterator it = _outstanding.find(request); assert(it != _outstanding.end()); bool aborted = it->second.aborted; _available.push(it->second.workerThread); _outstanding.erase(it); return aborted; } void WorkerThreadQueue::release(WorkerThreadRequest *request, bool &found, bool &aborted) { std::map< WorkerThreadRequest *, OutstandingRequest >::iterator it = _outstanding.find(request); found = it != _outstanding.end(); if (found) { aborted = it->second.aborted; _available.push(it->second.workerThread); _outstanding.erase(it); } } void WorkerThreadQueue::abort(WorkerThreadRequest *request) { if (OutstandingRequest *r = getProperty(_outstanding, request)) { r->aborted = true; } } ///////////////////////////////////////////////////////////////////// // class WorkerCluster::WorkerStripe ///////////////////////////////////////////////////////////////////// bool WorkerCluster::WorkerStripe::acceptable(Request const *request) const { return acceptable(dynamic_cast(request)); } bool WorkerCluster::WorkerStripe::acceptable(WorkerThreadRequest const *request) const { return _workers.available() && matches(request); } bool WorkerCluster::WorkerStripe::matches(WorkerThreadRequest const *request) const { const DayNumber day = request->getDayNumber(); const bool result = matches(day); /* TclList msg; msg<whichDatabase()<<" --> "<workersAvailable(); TclList result; // jobs_in_queue -- The number of jobs submitted but not yet assigned to a // queue. // stripes -- Typically this is the total number number of worker threads // available. Seach this file for "duplicateCount" to see where // this approximation can break down. // workers_available -- The number of worker threads waiting for work. result<<"jobs_in_queue"<<_pending.getCount() <<"stripes"<<_stripes.size() <<"workers_available"<matchAll(_pending); } } WorkerCluster::~WorkerCluster() { for (StripeSet::iterator it = _stripes.begin(); it != _stripes.end(); it++) { // I make the stripes as pointers, so I don't have to copy them as // as the vector grows. I'm not certain that was necessary. delete *it; } } void WorkerCluster::remove(SocketInfo *socket) { _pending.remove(socket); for (StripeSet::iterator it = _stripes.begin(); it != _stripes.end(); it++) { (*it)->remove(socket); } } void WorkerCluster::remove(WorkerThreadRequest *toDelete) { _pending.remove(toDelete); for (StripeSet::iterator it = _stripes.begin(); it != _stripes.end(); it++) { (*it)->remove(toDelete); } } bool WorkerCluster::acknowledge(WorkerThreadRequest *request) { bool found = false; for (StripeSet::iterator it = _stripes.begin(); it != _stripes.end(); it++) { bool aborted; (*it)->release(request, found, aborted); if (found) { return aborted; } } assert(false); return true; } void WorkerCluster::addWork(WorkerThreadRequest *request) { for (StripeSet::iterator it = _stripes.begin(); it != _stripes.end(); it++) { if ((*it)->doWork(request)) { return; } } _pending.push(request); } WorkerCluster::WorkerStripe *WorkerCluster::addStripe(unsigned int divisor, unsigned int remainder, bool acceptUnknown) { WorkerStripe *newStripe = new WorkerStripe(divisor, remainder, acceptUnknown); _stripes.push_back(newStripe); return newStripe; } WorkerCluster::WorkerCluster(std::string configItemName, std::string baseName, int returnMessageId, RequestListener *returnListener) { // Create threads. // If there are n database machines, we create 2*n threads. // We divide the alerts into n equal pieces. // Each database machine gets two consecutive peices, one per thread. // We might get slightly better caching by making both threads for the same // database do the same section of the alerts, but this gives us some // overlap. If one database machine is slow or completely down, this will // allow us to continue working. const std::string list = getConfigItem(configItemName, "@RO,@RO"); const std::vector< std::string > pieces = explode(",", list); const int dbCount = pieces.size(); assert(dbCount); // At this time (5/30/2008) was have 6 database machines and 5 front end // machines. 1 thread per front end machine per database was the ideal // number. With 2 threads, we were overloading the database machines, and // they could not keep up with the master database. For testing purposes // we use 5 threads on one front end machine, and direct all traffic to that // machine. This simulates having all 5 front end machines going to the // database, but it is slighly more predictible. const int duplicateCount = strtolDefault(getConfigItem(configItemName + "_count"), 1); const std::vector< std::string > noRealtimeList = explode(",", getConfigItem(configItemName + "_no_realtime")); const std::string barsDb = getConfigItem(configItemName + "_bars"); const std::set< std::string > noRealtimeSet(noRealtimeList.begin(), noRealtimeList.end()); for (int i = 0; i < dbCount; i++) { const std::string roDatabaseName = pieces[i]; const bool allowRealtime = !noRealtimeSet.count(roDatabaseName); const std::string dbMessage = allowRealtime?"":" (no realtime)"; WorkerStripe *stripe = addStripe(dbCount, i, allowRealtime); stripe->setDebugName(roDatabaseName + ' ' + baseName + dbMessage); for (int duplicate = 0; duplicate < duplicateCount; duplicate++) { WorkerThread *t = new WorkerThread(baseName, returnMessageId, returnListener); t->setMasterDatabase (new DatabaseWithRetry(false, t->getName() + " Master")); t->setReadOnlyDatabase (new DatabaseWithRetry(roDatabaseName, t->getName() + " Read Only" + dbMessage)); t->setReadOnlyBarsDatabase (new DatabaseWithRetry(barsDb, t->getName() + " Bars" + dbMessage)); stripe->addWorker(t); } } ThreadMonitor::find().add(this); }