#include "../shared/CommandDispatcher.h" #include "../shared/ReplyToClient.h" #include "../shared/LogFile.h" #include "../shared/XmlSupport.h" #include "../shared/GlobalConfigFile.h" #include "../shared/NewConnections.h" #include "CandleDataProvider.h" #include "UserInfo.h" #include "../shared/TclUtil.h" #include "TclReplyToClient.h" #include "TclLogWrapper.h" #include "../shared/CurrentRequest.h" #include "ScriptDispatcher.h" ///////////////////////////////////////////////////////////////////// // ScriptDispatcher::TSTclObjWrapper ///////////////////////////////////////////////////////////////////// ScriptDispatcher::TSTclObjWrapper::TSTclObjWrapper(Tcl_Obj *value) : _value(value) { if (value) Tcl_IncrRefCount(value); } ScriptDispatcher::TSTclObjWrapper::~TSTclObjWrapper() { getPrimary()->threadSafeDecrRefCount(_value); } ///////////////////////////////////////////////////////////////////// // ScriptDispatcherPreloader ///////////////////////////////////////////////////////////////////// int ScriptDispatcherPreloader::preloadCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((ScriptDispatcherPreloader *)clientData) ->preloadCmd(interp, objc, objv, 1); } int ScriptDispatcherPreloader::preloadCmd(Tcl_Interp *interp, int objc, Tcl_Obj *const objv[], int offset) { if (objc < 1 + offset) { Tcl_WrongNumArgs(interp, offset, objv, "subcommand ?argument ...?"); return TCL_ERROR; } static char const *subcommands[] = { "add", "remove", "clear", "set", "count", "all", NULL }; int subcommand; if (Tcl_GetIndexFromObj(interp, objv[offset], subcommands, "subcommand", 0, &subcommand) != TCL_OK) return TCL_ERROR; switch (subcommand) { case 0: { // add const int before = preloadCount(); for (int i = offset + 1; i < objc; i++) preloadData(getString(objv[i])); Tcl_SetObjResult(interp, Tcl_NewIntObj(preloadCount() - before)); return TCL_OK; } case 1: { // remove const int before = preloadCount(); for (int i = offset + 1; i < objc; i++) unpreloadData(getString(objv[i])); Tcl_SetObjResult(interp, Tcl_NewIntObj(before - preloadCount())); return TCL_OK; } case 2: { // clear if (objc != 1 + offset) { Tcl_WrongNumArgs(interp, offset, objv, "clear"); return TCL_ERROR; } const int before = preloadCount(); clearAllPreloads(); Tcl_SetObjResult(interp, Tcl_NewIntObj(before)); return TCL_OK; } case 3: { // set if (objc != 2 + offset) { Tcl_WrongNumArgs(interp, offset + 1, objv, "list"); return TCL_ERROR; } int listCount; Tcl_Obj **listItems; if (Tcl_ListObjGetElements(interp, objv[offset+1], &listCount, &listItems) != TCL_OK) return TCL_ERROR; std::set< std::string > toSave; for (int i = 0; i < listCount; i++) { const std::string item = getString(listItems[i]); preloadData(item); toSave.insert(item); } std::set< std::string > toDelete; for (Preloaded::const_iterator it = _preloaded.begin(); it != _preloaded.end(); it++) { std::string const &item = it->first; if (!toSave.count(item)) toDelete.insert(item); } for (std::set< std::string >::const_iterator it = toDelete.begin(); it != toDelete.end(); it++) { unpreloadData(*it); } return TCL_OK; } case 4: { // Count Tcl_SetObjResult(interp, Tcl_NewIntObj(preloadCount())); return TCL_OK; } case 5: { // all Tcl_Obj *const result = Tcl_NewObj(); for (Preloaded::const_iterator it = _preloaded.begin(); it != _preloaded.end(); it++) { Tcl_ListObjAppendElement(interp, result, makeTclString(it->first)); } Tcl_SetObjResult(interp, result); return TCL_OK; } } return TCL_ERROR; } ScriptDispatcherPreloader::~ScriptDispatcherPreloader() { clearAllPreloads(); } void ScriptDispatcherPreloader::preloadData(std::string const &symbol) { if (_preloaded.count(symbol)) // Ignore duplicates. return; Preload &preload = _preloaded[symbol]; preload.dailyReceipt = CandleDataNode::find(symbol, false); preload.intradayReceipt = CandleDataNode::find(symbol, true); } void ScriptDispatcherPreloader::unpreloadData(std::string const &symbol) { const auto it = _preloaded.find(symbol); if (it != _preloaded.end()) { Preload &preload = it->second; preload.dailyReceipt->decrementReferenceCount(); preload.intradayReceipt->decrementReferenceCount(); _preloaded.erase(it); } } void ScriptDispatcherPreloader::clearAllPreloads() { while (!_preloaded.empty()) unpreloadData(_preloaded.begin()->first); } int ScriptDispatcherPreloader::preloadCount() const { return _preloaded.size(); } void ScriptDispatcherPreloader::installPreloadCommand (Tcl_Interp *interp, std::string const &name) { Tcl_CreateObjCommand(interp, name.c_str(), preloadCmd, this, NULL); } ///////////////////////////////////////////////////////////////////// // ScriptDispatcher ///////////////////////////////////////////////////////////////////// ScriptDispatcher *ScriptDispatcher::_primary = NULL; inline Tcl_Interp *ScriptDispatcher::getInterp() { return ExecutionContext::getInstance()->getInterp(); /* Tcl_Interp *result = ExecutionContext::getInstance()->getInterp(); TclList msg; msg<getName() <callbackId) { case mtCancelDispatcherScript: // This requires TCL 8.6 if (_executionContext) Tcl_CancelEval(_executionContext->getInterp(), NULL, NULL, TCL_CANCEL_UNWIND); // Strange. The man page for Tcl_CancelEval shows only 3 arguments. // https://www.tcl.tk/man/tcl/TclLib/Cancel.htm That's says it's for // version 8.6.1, which I have. But this other page shows four arguments, // which is what my header file expects. // http://www.tcl.tk/cgi-bin/tct/tip/285.html // This seems buggy. When I cancel evaluation, it initially seems to // work. The current script ends with an error message. But when I try // to run another script, I still get an error. It's like the canceled // state is sticky and I can never get out. I don't think that's how it's // supposed to work. break; } delete request; }; */ int ScriptDispatcher::getSocketIdCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((ScriptDispatcher *)clientData)->getSocketIdCmd(objc, objv); } int ScriptDispatcher::getSocketIdCmd(int objc, Tcl_Obj *const objv[]) { Tcl_Interp *const interp = _gridFiller.debugGetInterp(); if (objc != 1) { Tcl_WrongNumArgs(interp, 1, objv, ""); return TCL_ERROR; } Tcl_SetObjResult(interp, Tcl_NewLongObj(SocketInfo::getSerialNumber(CurrentRequest::getSocketInfo()))); return TCL_OK; } std::string ScriptDispatcher::getSocketTempNS(SocketInfo *socketInfo) { return "::ti::socket::" + ntoa(SocketInfo::getSerialNumber(socketInfo)); } std::string ScriptDispatcher::getSocketTempNS() { return getSocketTempNS(CurrentRequest::getSocketInfo()); } int ScriptDispatcher::getUsernameCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((ScriptDispatcher *)clientData)->getUsernameCmd(objc, objv); } int ScriptDispatcher::getUsernameCmd(int objc, Tcl_Obj *const objv[]) { Tcl_Interp *const interp = _gridFiller.debugGetInterp(); if (objc != 1) { Tcl_WrongNumArgs(interp, 1, objv, ""); return TCL_ERROR; } Tcl_SetObjResult(interp, makeTclString(getUsername())); return TCL_OK; } int ScriptDispatcher::dispatchTclCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((ScriptDispatcher *)clientData)->dispatchTclCmd(objc, objv); } int ScriptDispatcher::dispatchTclCmd(int objc, Tcl_Obj *const objv[]) { Tcl_Interp *const interp = getInterp(); if (objc != 4) { Tcl_WrongNumArgs(interp, 1, objv, "input remote_script return_script"); return TCL_ERROR; } SocketInfo *const socket = CurrentRequest::getSocketInfo(); class Info { public: Tcl_Obj *const input; const std::string inputString; const std::string remoteScript; Tcl_Obj *const returnScript; std::string output; bool error; Info(Tcl_Obj *const objv[]) : input(objv[1]), inputString(getString(objv[1])), remoteScript(getString(objv[2])), returnScript(objv[3]) { Tcl_IncrRefCount(input); Tcl_IncrRefCount(returnScript); } ~Info() { getPrimary()->threadSafeDecrRefCount(input); getPrimary()->threadSafeDecrRefCount(returnScript); } typedef NCTSRefCount< Info > Ref; TclList logDump() { return TclList("input", inputString, "remote_script", remoteScript, "return_script", getString(returnScript)); } }; Info::Ref info = new Info(objv); /* std::cout<<"ScriptDispatcher::dispatchTclCmd " <<"input=“"<inputString<<"”, " <<"script=“"<remoteScript<<"”" <inputString), 0); TclObjCache &objCache = TclObjCache::getInstance(); int result = Tcl_EvalObjEx(interp, objCache.find(info->remoteScript).getObj(), 0); info->output = getString(Tcl_GetObjResult(interp)); if (result == TCL_OK) info->error = false; else { info->error = true; // Should we really report an error here. It's possible that this // will be handled in the return script. In particular, there are // helper scripts which will process this error in a reasonable way. // For a simple request, the error could get re-thrown in the return // script. For a long list of requests, we often group the errors // and print one message summarizing the errors. reportError("ScriptDispatcher::dispatchTclCmd() remote", info->remoteScript, interp, socket); } // Clear this variable immediately. It would be overwritten the next // call, so this doesn't change much, but if this was pointing to some // big object, like a grid, we want to release the reference count now, // rather than waiting for the next message to this thread. On a busy // system this doesn't mean much, but sometimes in development it's // nice to see results instantly. Tcl_UnsetVar(interp, "input", 0); }; jobInfo->backHome = [=]() { ThreadMonitor::SetState tm("dispatchTclCmd"); Tcl_Interp *interp = getInterp(); Tcl_UnsetVar(interp, "input", 0); Tcl_SetVar2Ex(interp, "input", NULL, info->input, 0); Tcl_UnsetVar(interp, "output", 0); Tcl_SetVar2Ex(interp, "output", NULL, makeTclString(info->output), 0); Tcl_UnsetVar(interp, "error", 0); Tcl_SetVar2Ex(interp, "error", NULL, Tcl_NewBooleanObj(info->error), 0); int result = Tcl_EvalObjEx(interp, info->returnScript, 0); if (result != TCL_OK) reportError("ScriptDispatcher::dispatchTclCmd() return", getString(info->returnScript), interp, socket); // Clear these variables immediately. They would be overwritten the next // call, so this doesn't change much, but if this was pointing to some // big object, like a grid, we want to release the reference count now, // rather than waiting for the next message to this thread. On a busy // system this doesn't mean much, but sometimes in development it's // nice to see results instantly. Tcl_UnsetVar(interp, "input", 0); Tcl_UnsetVar(interp, "output", 0); Tcl_UnsetVar(interp, "error", 0); }; jobInfo->description = [=]() { return (std::string)info->logDump(); }; jobInfo->socket = socket; _newWorkerCluster.addJob(jobInfo); Tcl_ResetResult(interp); return TCL_OK; } int ScriptDispatcher::dispatchTclAllCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((ScriptDispatcher *)clientData)->dispatchTclAllCmd(objc, objv); } int ScriptDispatcher::dispatchTclAllCmd(int objc, Tcl_Obj *const objv[]) { Tcl_Interp *const interp = _gridFiller.debugGetInterp(); if (objc != 2) { Tcl_WrongNumArgs(interp, 1, objv, "script"); return TCL_ERROR; } const std::string script = getString(objv[1]); dispatchTclAll(script, CurrentRequest::getSocketInfo()); Tcl_SetObjResult(interp, Tcl_NewIntObj(_newWorkerCluster.workerCount())); return TCL_OK; } int ScriptDispatcher::dispatchDeferredCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((ScriptDispatcher *)clientData)->dispatchDeferredCmd(objc, objv); } int ScriptDispatcher::dispatchDeferredCmd(int objc, Tcl_Obj *const objv[]) { TSTclObjWrapper::Ref input; TSTclObjWrapper::Ref script; if (objc == 2) script = new TSTclObjWrapper(objv[1]); else if (objc == 3) { input = new TSTclObjWrapper(objv[1]); script = new TSTclObjWrapper(objv[2]); } else { Tcl_WrongNumArgs(getInterp(), 1, objv, "?input? script"); return TCL_ERROR; } SocketInfo *const socket = CurrentRequest::getSocketInfo(); _dataNodeThread->submit(socket, [=]() { doWorkInThread(socket, [=]() { ThreadMonitor::SetState tm("dispatchDeferredCmd"); Tcl_Interp *const interp = getInterp(); if (input) Tcl_SetVar2Ex(interp, "deferred_input", NULL, input->getValue(), 0); const int result = Tcl_EvalObjEx(interp, script->getValue(), 0); if (result != TCL_OK) reportError("ScriptDispatcher::dispatchDeferredCmd", getString(script->getValue()), interp); }); }); return TCL_OK; } // This is strictly for testing this one feature. This is likely to change // in the future to test other things. Don't use this for anything else! static int tclTestCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { Tcl_Obj *const result = Tcl_NewObj(); Tcl_Namespace *const nameSpace = Tcl_GetCurrentNamespace(interp); Tcl_ListObjAppendElement(interp, result, makeTclString(nameSpace->name)); Tcl_ListObjAppendElement(interp, result, makeTclString(nameSpace->fullName)); for (int i = 1; i < objc; i++) Tcl_ListObjAppendElement (interp, result, makeTclString(ExecutionContext::getAbsoluteName(interp, getString(objv[i])))); Tcl_SetObjResult(interp, result); return TCL_OK; /* result: command=tcl_command&script=tcl_test == MESSAGE 0 ========== 8 == 0: {} :: command=tcl_command&script=namespace eval x::y::z {tcl_test} == MESSAGE 0 ========== 14 == 0: z ::x::y::z */ } void ScriptDispatcher::initializeInThread() { //std::cout<<"starting ScriptDispatcher::initializeInThread()"<debugGetInterp(); ExecutionContext *executionContext = new ExecutionContext(interp, gridFiller); executionContext->initInterp("worker"); }, strtolDefault(getConfigItem("grid_worker_thread_count"), 8)); Tcl_Interp *const interp = _gridFiller.debugGetInterp(); ExecutionContext *executionContext = new ExecutionContext(interp, &_gridFiller); Tcl_CreateNamespace(interp, "ti", NULL, NULL); Tcl_CreateObjCommand(interp, "ti::get_username", getUsernameCmd, this, NULL); Tcl_CreateObjCommand(interp, "ti::get_socket_id", getSocketIdCmd, this, NULL); Tcl_CreateObjCommand(interp, "dispatch_tcl", dispatchTclCmd, this, NULL); Tcl_CreateObjCommand(interp, "dispatch_tcl_all", dispatchTclAllCmd, this, NULL); Tcl_CreateObjCommand(interp, "dispatch_deferred", dispatchDeferredCmd, this, NULL); Tcl_CreateObjCommand(interp, "tcl_test", tclTestCmd, NULL, NULL); TclReplyToClient::instance().addGetCurrentChannel(interp); _preloader.installPreloadCommand(interp, "preload"); executionContext->initInterp("dispatcher"); if (getConfigItem("no_streaming_data")=="1") { const std::string script = "ti::start_fake_candle_timer"; const int result = Tcl_Eval(interp, script.c_str()); if (result != TCL_OK) reportError("ScriptDispatcher.C initializing", script, interp, NULL); } // Use a separate socket for these operations for security reasons. A normal // user should never be able to send an arbitrary tcl script. This // separation prevents us from accidentally publishing this permission. // Also, we can keep this behind our firewall, so no one can listen in. CommandDispatcher *superUserDispatcher = new CommandDispatcher("superUserDispatcher"); InputListener *superUserListener = new InputListener(superUserDispatcher->getInput(), superUserDispatcher->getInputCallbackId()); const int listenPort = strtolDefault(getConfigItem("super_user_listen_port", "9370"), -1); if (listenPort <= 0) { std::cerr<<"Invalid super user listen port \"" <listenForCommand("tcl_command", this, mtRunUser); // The next command may be helpful to simulate the normal user. superUserDispatcher->listenForCommand("flex_command", this, mtRunFlex); NewConnections *newSuperUserConnections = new NewConnections(superUserListener, listenPort); if (!newSuperUserConnections->getSuccess()) { std::cerr<<"Unable to listen for new connections on super user port.\n"; _exit(5); } //std::cout<<"finished ScriptDispatcher::initializeInThread()"<callbackId) { case mtRunUser: { tm.increment("mtRunUser"); tm.setState("mtRunUser"); ExternalRequest *request = dynamic_cast(original); const std::string script = request->getProperty("script"); Tcl_Interp *const interp = _gridFiller.debugGetInterp(); const int result = Tcl_Eval(interp, script.c_str()); const std::string response = ntoa(result) + ": " + Tcl_GetStringResult(interp); addToOutputQueue(request->getSocketInfo(), response, request->getResponseMessageId()); break; } case mtCandleTime: { tm.increment("mtCandleTime"); tm.setState("mtCandleTime"); DataNodeThread::TimeRequest *request = dynamic_cast< DataNodeThread::TimeRequest * >(original); const std::string script = "ti::do_candle_callbacks " + ntoa(request->time); Tcl_Interp *const interp = _gridFiller.debugGetInterp(); const int result = Tcl_Eval(interp, script.c_str()); if (result != TCL_OK) reportError("mtCandleTime", script, interp, NULL); break; } case mtRunFlex: { tm.increment("mtRunFlex"); tm.setState("mtRunFlex"); if (_verboseFlexCommand) LogFile::primary().sendString(TclList()<getSocketInfo()); _flexCommands.push(original); keepOriginal(); // doFlexCommand() will process then delete this request. break; } case mtLogin: { tm.increment("mtLogin"); tm.setState("mtLogin"); UserInfoThread::Login *request = dynamic_cast< UserInfoThread::Login * >(original); SocketInfo *const socket = request->getSocketInfo(); _userBySocket[socket] = request->username; //LogFile::primary().sendString(TclList()<username, socket); // TEMP break; } } } void ScriptDispatcher::doFlexCommand(ExternalRequest *request) { ThreadMonitor::SetState tm("mtRunFlex"); tm.increment("mtRunFlex"); if (_verboseFlexCommand) LogFile::primary().sendString(TclList()<getProperty("subcommand"), request->getSocketInfo()); // or ...<debugDump()<<... CurrentRequest::Simple currentRequest(request); Tcl_Interp *const interp = _gridFiller.debugGetInterp(); Tcl_Obj *const dict = Tcl_NewDictObj(); Tcl_IncrRefCount(dict); for(PropertyList::const_iterator it = request->getProperties().begin(); it != request->getProperties().end(); it++) Tcl_DictObjPut(NULL, dict, makeTclString(it->first), makeTclString(it->second)); Tcl_UnsetVar(interp, "ti::flex_input", 0); Tcl_SetVar2Ex(interp, "ti::flex_input", NULL, dict, 0); const int result = Tcl_Eval(interp, "ti::flex_command"); Tcl_DecrRefCount(dict); if (result) reportError("mtFlexCommand", "ti::flex_command", interp, request->getSocketInfo()); delete request; } void ScriptDispatcher::socketClosed(SocketInfo *socket) { Tcl_Interp *const interp = getInterp(); const int result = Tcl_Eval(interp, "ti::socket_cleanup"); if (result != TCL_OK) reportError("socketClosed()", "ti::socket_cleanup", interp, socket); _userBySocket.erase(socket); _flexCommands.remove(socket); } void ScriptDispatcher::beforeSleep(IBeforeSleepCallbacks &callbacks) { //std::cout<<"starting ScriptDispatcher::beforeSleep()"<::const_iterator it = _toDecrRefCount.begin(); it != _toDecrRefCount.end(); it++) Tcl_DecrRefCount(*it); _toDecrRefCount.clear(); pthread_mutex_unlock(&_mutex); //ThreadMonitor &tm = ThreadMonitor::find(); //tm.increment("_needToDecrRefCount"); //tm.increment("_toDecrRefCount", objCount); } ThreadMonitor &tm = ThreadMonitor::find(); // Create a very simple event loop for TCL. This will allow after // and other commands to work. tm.setState("Tcl_DoOneEvent"); while (Tcl_DoOneEvent(TCL_DONT_WAIT)) { // This will count the number of times we actually did an event, not // the number of times we tried. //std::cout<<"Tcl_DoOneEvent(TCL_DONT_WAIT)"<(_flexCommands.pop())); if (_flexCommands.empty()) // For simplicity we wake TCL about once per second, sooner if we have // other work to do. TCL offers better integration, but this should // suffice. callbacks.wakeAfterMs(1000); else { // If we have flex commands queued up, ask to wake up again as soon as we // check that there is no new high priority work. We can handle any number // of high priority items but we do at most one flex command each time // the event loop wakes up. tm.increment("mtRunFlex_deferred"); callbacks.wakeAfterMs(0); } //std::cout<<"finished ScriptDispatcher::beforeSleep()"<listenForCommand("flex_command", this, mtRunFlex); UserInfoThread::getInstance().addLoginListener(this, mtLogin); dataNodeThread.requestTimerUpdates(this, mtCandleTime); start(); } ScriptDispatcher::~ScriptDispatcher() { assert(false); } void ScriptDispatcher::init(DataNodeThread &dataNodeThread) { assert(!_primary); new ScriptDispatcher(dataNodeThread); } void ScriptDispatcher::runScript(SocketInfo *socketInfo, std::string toRun) { doWorkInThread(socketInfo, [=]() { ThreadMonitor::SetState tm("runScript"); Tcl_Interp *const interp = getInterp(); const int result = Tcl_Eval(interp, toRun.c_str()); if (result != TCL_OK) reportError("ScriptDispatcher::runScript", toRun, interp); }); } /* ti::create_daily_prototype name body row_count ?last_day? prototype get_total_row_count ?time? prototype get_times row prototype create name symbol grid fill_to_now grid debug_dump grid completed_row_count grid reference row column grid truncate row_count grid prototype ... */