#include "LogFile.h" #include "ThreadMonitor.h" #include "NewWorkerCluster.h" ///////////////////////////////////////////////////////////////////// // NewWorkerCluster::JobInfo ///////////////////////////////////////////////////////////////////// NewWorkerCluster::JobInfo::JobInfo() : socket(NULL), beforeDispatch([]() { return true; }), // Leave the default for inThread. That throws an exception. backHome([]() {}), description([]() { return std::string(); }) { } void NewWorkerCluster::JobInfo::Ref::setDescription(std::string const &v) { setDescription([v]() { return v; }); } ///////////////////////////////////////////////////////////////////// // NewWorkerCluster ///////////////////////////////////////////////////////////////////// void NewWorkerCluster::addJob(JobInfo::CRef const &jobInfo) { Job *job = new Job(jobInfo); _jobs.push(job); match(); } std::string NewWorkerCluster::getInfoForThreadMonitor() { TclList result; result<<"NewWorkerCluster"<<_baseName; if (_availableWorkers.size()) result<<"_availableWorkers"<<_availableWorkers.size(); if (_busyWorkers.size()) result<<"_busyWorkers"<<_busyWorkers.size(); if (_jobs.getCount()) result<<"_jobs"<<_jobs.getCount(); return result; } std::string NewWorkerCluster::debugDump() const { TclList result; result<<"NewWorkerCluster" <<"_baseName"<<_baseName <<"_allWorkers"<<_allWorkers.size() <<"_availableWorkers"<<_availableWorkers.size() <<"_busyWorkers"; TclList busy; for (auto &kvp : _busyWorkers) // We have more info here. We know how long the thread has been busy for. busy<socket); result<socket == socket) { IContainerThread *const worker = kvp.first; reportRequestEnded(worker, info, /* socketClosed = */ true); // Restore the worker to the available queue. // // But not immediately. First send another message to the same // worker thread just to make sure it's still alive. // // Imagine I start some long job, maybe it takes 2 minutes. I // get disconnected after 10 seconds. If we immediately add // the worker back to the list, we might immediately send a new // request to that worker. But that new request will have to // wait almost 2 minutes just to start, even if other workers // were still available. // // Take the same message we were trying to send, clear out some // of settings, then send it back to the same worker. The worker // treats this like a normal job, as does the dispatcher when // the job completes. // // Clear out some fields so we don't actually try to do any of // the work originally requested by the user. We're just trying // to finish the internal process associated with a job. // // Keep some of the other fields, in particular the startTime, // so we can do proper accounting for the original job. This // message will return from worker shortly after the original // job finishes. JobInfo::Ref jobInfo; jobInfo->inThread = []() {}; jobInfo->description = info.info->description; info.pingAfterSocketClosed = true; info.info = jobInfo; worker->addLambdaToQueue([=]() { doJobInThread(jobInfo); }); ThreadMonitor::find().increment(s_socketClosed_return_worker); } } match(); } void NewWorkerCluster::match() { while (true) { if (_availableWorkers.empty()) return; if (_jobs.empty()) return; JobInfo::CRef jobInfo; { Job *const job = static_cast< Job * >(_jobs.pop()); jobInfo = job->info; // We have transitioned from the old Request-based world into the new // smart pointers and lambda expressions world, and we're not going back. delete job; } if (!jobInfo->beforeDispatch()) // The job requested not to go any further. // Keep looking. Hopfully beforeDispatch() will be quick. We can do // any number of these before returning from match(). continue; // Found a good job. Send it to a worker. IContainerThread *worker = _availableWorkers.front(); _availableWorkers.pop(); _busyWorkers[worker] = { .startTime = getMicroTime(), .hasBeenReported = false, .pingAfterSocketClosed = false, .info = jobInfo }; worker->addLambdaToQueue([=]() { doJobInThread(jobInfo); }, jobInfo->socket); } } void NewWorkerCluster::doJobInThread(JobInfo::CRef const &job) { ThreadMonitor::find().increment("NewWorkerCluster::doJobInThread"); job->inThread(); IContainerThread *worker = IContainerThread::current(); doWorkInThread(job->socket, [=](){ doJobReturn(job, worker); }); } void NewWorkerCluster::doJobReturn(JobInfo::CRef const &job, IContainerThread *worker) { job->backHome(); _availableWorkers.push(worker); auto it = _busyWorkers.find(worker); reportRequestEnded(worker, it->second, /* socketClosed = */ false); _busyWorkers.erase(it); // The job will be implicitly deleted here. Or sooner, if it got canceled // and doJobReturn() was never called. match(); } void NewWorkerCluster::reportRequestEnded(IContainerThread *worker, BusyWorkerInfo const &workerInfo, bool socketClosed) const { if (!workerInfo.hasBeenReported) return; TclList msg; msg<getThreadName() <<"time in μsec"<socket); } /* Test code for the slow requests. Connect to tikiller via the TCL shell, * type the following lines, then hit the "Send Streaming" button. set channel [ti::get_current_channel] set start [clock microseconds] ti::dispatch 2000 {after $input} ti::reply_to_client $channel [expr {[clock microseconds] - $start}]μs * The result was: {Sun May 10 19:21:23 2020} 2 NewWorkerCluster.C 237 beforeSleep {Slow job detected} worker {ScriptDispatcher 0x7f4f6c001520} {time in μsec} 1,500,784 {available workers} 15 description {input 2000 remote_script {after $input} return_script ::ti::socket::2::coroutine2} {Sun May 10 19:21:24 2020} 2 NewWorkerCluster.C 197 reportRequestEnded worker {ScriptDispatcher 0x7f4f6c001520} {time in μsec} 2,002,558 status finished * Here's a second case where I disconnected the client from the server before * the first warning. The Socket_closed message appears in the log much later, * immediately after the slow job finished. {Sun May 10 19:24:04 2020} 0 NewWorkerCluster.C 237 beforeSleep {Slow job detected} worker {ScriptDispatcher 0x7f4f6c001b80} {time in μsec} 1,501,223 {available workers} 15 description {input 2000 remote_script {after $input} return_script ::ti::socket::2::coroutine3} {Sun May 10 19:24:05 2020} 2 Socket_closed {Sun May 10 19:24:05 2020} 0 NewWorkerCluster.C 197 reportRequestEnded worker {ScriptDispatcher 0x7f4f6c001b80} {time in μsec} 2,002,453 status {finished after socket closed} * This test was similar. The script ran for 6 seconds. I broke the * connection between the client and the server shortly after I saw the * timer warning in the log. {Sun May 10 19:31:42 2020} 4 NewWorkerCluster.C 237 beforeSleep {Slow job detected} worker {ScriptDispatcher 0x7f4f6c002950} {time in μsec} 1,500,725 {available workers} 15 description {input 6000 remote_script {after $input} return_script ::ti::socket::4::coroutine5} {Sun May 10 19:31:46 2020} 4 CommandDispatcher.C {EOF on read} {Sun May 10 19:31:46 2020} 4 NewWorkerCluster.C 197 reportRequestEnded worker {ScriptDispatcher 0x7f4f6c002950} {time in μsec} 5,477,916 status {socket closed while running} {Sun May 10 19:31:46 2020} 5 New_socket 192.168.1.1 {Sun May 10 19:31:47 2020} 4 Socket_closed {Sun May 10 19:31:47 2020} 0 NewWorkerCluster.C 197 reportRequestEnded worker {ScriptDispatcher 0x7f4f6c002950} {time in μsec} 6,001,816 status {finished after socket closed} */ void NewWorkerCluster::beforeSleep(IBeforeSleepCallbacks &callbacks) { ThreadMonitor::SetState tm("look for slow requests"); const int64_t thresholdToReport = 1500000; // 1½ seconds. const int64_t now = getMicroTime(); for (auto &kvp : _busyWorkers) { BusyWorkerInfo &info = kvp.second; if (info.hasBeenReported) continue; const auto timePassed = now - info.startTime; const auto timeUntilReport = thresholdToReport - timePassed; if (timeUntilReport > 0) { const auto msUntilReport = (timeUntilReport + 999) / 1000; callbacks.wakeAfterMs(msUntilReport); } else { info.hasBeenReported = true; TclList msg; msg<getThreadName() <<"time in μsec"<description(); if (!description.empty()) msg<<"description"<socket); } } } void NewWorkerCluster::createWorkers(int count) { for (; count > 0; count--) createWorker(NULL, _baseName, _logPeriod); } void NewWorkerCluster::createWorker(IContainerThread::Work *initializer, std::string name, int logPeriod) { IContainerThread *thread = IContainerThread::create(name, true, logPeriod); if (initializer) thread->addToQueue(initializer); _availableWorkers.push(thread); _allWorkers.push_back(thread); match(); }