#include "../shared/ThreadMonitor.h" #include "../shared/TclUtil.h" #include "../shared/CurrentRequest.h" #include "GridWorkerThread.h" ///////////////////////////////////////////////////////////////////// // GridWorkerThread::GridWorkerJob ///////////////////////////////////////////////////////////////////// int GridWorkerThread::GridWorkerJob::tclCmd(ExecutionContext &executionContext, int objc, Tcl_Obj *const objv[]) { Tcl_Interp *const interp = executionContext.getInterp(); Tcl_SetObjResult(interp, makeTclString("Not implemented.")); return TCL_ERROR; } ///////////////////////////////////////////////////////////////////// // GridWorkerThread ///////////////////////////////////////////////////////////////////// int GridWorkerThread::currentJobCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((GridWorkerThread *)clientData)->currentJobCmd(objc, objv); } int GridWorkerThread::currentJobCmd(int objc, Tcl_Obj *const objv[]) { assert(_executionContext); if (!_currentJob) { Tcl_SetObjResult(_executionContext->getInterp(), makeTclString("No current job.")); return TCL_ERROR; } return _currentJob->tclCmd(*_executionContext, objc, objv); } GridWorkerThread::GridWorkerThread() : ThreadClass("GridWorkerThread " + pointerToString(this)), _incoming(getName()), _executionContext(NULL), _currentJob(NULL) { startThread(); } void GridWorkerThread::threadFunction() { ThreadMonitor &tm = ThreadMonitor::find(); Tcl_Interp *const interp = _gridFiller.debugGetInterp(); _executionContext = new ExecutionContext(interp, &_gridFiller); _executionContext->initInterp("worker"); Tcl_CreateObjCommand(interp, "current_job", currentJobCmd, this, NULL); while (true) { while (Request *current = _incoming.getRequest()) { CurrentRequest::Simple cr(current); tm.setState("Working"); switch (current->callbackId) { case mtSubmit: { tm.increment("mtSubmit"); tm.setState("mtSubmit"); _currentJob = dynamic_cast< GridWorkerJob * >(current); _currentJob->inWorkerThread(*_executionContext); if (_currentJob->returnHome()) current = NULL; _currentJob = NULL; break; } case mtQuit: { delete _executionContext; delete current; return; } } delete current; } _incoming.waitForRequest(); } } GridWorkerThread::~GridWorkerThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void GridWorkerThread::submit(GridWorkerJob *gridWorkerJob) { gridWorkerJob->callbackId = mtSubmit; _incoming.newRequest(gridWorkerJob); } ///////////////////////////////////////////////////////////////////// // GridWorkerCluster ///////////////////////////////////////////////////////////////////// GridWorkerCluster::GridWorkerCluster(int workerCount) { for (; workerCount > 0; workerCount--) { GridWorkerThread *t = new GridWorkerThread(); _allThreads.push_back(t); _available.push(t); } } void GridWorkerCluster::addWork(GridWorkerThread::GridWorkerJob *job) { //assert(dynamic_cast((Request *)job)); _jobs.push(job); } void GridWorkerCluster::returning(Request *job) { const Working::iterator it = _working.find(job); assert((it != _working.end()) && "Job not found"); _available.push(it->second.thread); _working.erase(it); dynamic_cast< GridWorkerThread::GridWorkerJob * >(job)->afterReturn(); } void GridWorkerCluster::match() { ThreadMonitor &tm = ThreadMonitor::find(); tm.increment("match()"); while ((!_jobs.empty()) && (!_available.empty())) { GridWorkerThread::GridWorkerJob *const job = dynamic_cast(_jobs.pop()); GridWorkerThread *const thread = _available.front(); _available.pop(); PendingInfo pendingInfo; pendingInfo.socketInfo = job->getSocketInfo(); pendingInfo.thread = thread; _working[job] = pendingInfo; thread->submit(job); tm.increment("matched"); } } void GridWorkerCluster::remove(SocketInfo *socket) { // Remove any jobs that are waiting in our queue. _jobs.remove(socket); // If a job was given to a thread, and the socket closed, mark the thread // as available. std::vector< Working::iterator > toRemove; for (Working::iterator it = _working.begin(); it != _working.end(); it++) if (it->second.socketInfo == socket) { _available.push(it->second.thread); toRemove.push_back(it); } for (std::vector< Working::iterator >::const_iterator it = toRemove.begin(); it != toRemove.end(); it++) _working.erase(*it); }