#include "../shared/GlobalConfigFile.h" #include "UserInfo.h" #include "../shared/MicroSleep.h" #include "../shared/LogFile.h" #include "StreamingAlertsThread.h" ///////////////////////////////////////////////////////////////////// // StreamingAlertsRequest ///////////////////////////////////////////////////////////////////// StreamingAlertsRequest::StreamingAlertsRequest(SocketInfo *socket, AlertId startFrom) : Request(socket), _startFrom(startFrom), _maxAlertId(0), _maxedOut(false), _queryTime(0) { } void StreamingAlertsRequest::addOriginalQuery (std::string windowId, AlertSqlProducer const &sqlProducer, AlertConfig::CustomSql const &sql, int limit) { SingleRequest &request = _originalQueries[windowId]; request.sqlProducer = sqlProducer; request.sqlStatement = sql; request.limit = limit; request.keep = false; } void StreamingAlertsRequest::hideSomeData(int &counter) { XmlNode &data = _results["DATA"]; for (std::map< std::string, XmlNode >::iterator queryIterator = data.namedChildren.begin(); queryIterator != data.namedChildren.end(); queryIterator++) { XmlNode &windowNode = queryIterator->second; for (std::vector< class XmlNode >::iterator alertIterator = windowNode.orderedChildren.begin(); alertIterator != windowNode.orderedChildren.end(); alertIterator++) { if (counter >= 0) { XmlNode &alert = *alertIterator; std::string &symbol = alert.properties["SYMBOL"]; bool free = (symbol.empty()); if (!free) { if ((counter % 7) == 0) { switch (counter % 4) { case 0: alert.properties["DESCRIPTION"] = "Data delayed 20 minutes for DEMO users."; break; case 1: case 3: alert.properties["DESCRIPTION"] = "Trade-Ideas is free for Scottrade Elite users. " "http://www.scottradeelite.com/"; //alert.properties["URL"] = // "http://www.scottradeelite.com/"; break; case 2: alert.properties["DESCRIPTION"] = "Some data is removed for DEMO users. " "Sign up for an account TODAY."; break; } } } counter--; } else { counter = maxRTAlertCount; } } } } static const std::string S_out_of_order = "out_of_order"; void StreamingAlertsRequest::findAlerts(DatabaseWithRetry &masterDatabase, DatabaseWithRetry &database, LastIdCacheInfo &lastIdCache) { _queryTime = TimeVal(true).asMicroseconds(); static TimeVal::Microseconds delay = strtollDefault(getConfigItem("streaming_alerts_thread_delay"), 0); if (delay) { // This is used only for testing. microSleep(delay); } // See if we have any new queries to initialize. We need to do this even // if there is no streaming data. This will also be used for the short // history. for (RequestMap::iterator it = _originalQueries.begin(); it != _originalQueries.end(); it++) { AlertSqlProducer &sqlProducer = it->second.sqlProducer; if (sqlProducer.ready()) { AlertConfig::CustomSql &sqlStatement = it->second.sqlStatement; sqlProducer.init(masterDatabase, database, sqlStatement); it->second.newSql = true; } else it->second.newSql = false; } _maxAlertId = getLastAlertId(database, lastIdCache, userInfoGetInfo(getSocketInfo()).status == sFull); // Check for the silly case where we know we have no alerts because the // range is empty. This can happen because no new alerts have occurred. // This can also happen because different threads are looking at different // (replicated) databases (using different cache objects). It's possible // that we could even go backwards. if (_maxAlertId > _startFrom) { AlertId oldestAlertId = std::max(_maxAlertId - 10000, _startFrom); for (RequestMap::iterator it = _originalQueries.begin(); it != _originalQueries.end(); it++) { std::string windowId = it->first; SingleRequest const &request = it->second; AlertConfig::CustomSql const &sqlStatement = request.sqlStatement; XmlNode &windowNode = _results["DATA"][windowId]; int count = 0; const int limit = request.limit; std::string sql = sqlStatement.getCommon(oldestAlertId, _maxAlertId, limit); for (MysqlResultRef result = database.tryQueryUntilSuccess(sql); result->rowIsValid(); result->nextRow(), count++) { XmlNode &alertNode = windowNode[-1]; sqlStatement.copyAlert(alertNode, result, ahtRealTime); } if (count >= limit) { _maxedOut = true; } } } else if (_maxAlertId < _startFrom) { // The client asked us to start after the last id in the database. ThreadMonitor::find().increment(S_out_of_order); // We have to do this, otherwise we'll go backwards next time. _maxAlertId = _startFrom; } sendLastId(_results, _maxAlertId); _queryTime = TimeVal(true).asMicroseconds() - _queryTime; } static const std::string DATA="DATA"; static const std::string WINDOW="WINDOW"; static const std::string ID="ID"; void StreamingAlertsRequest::getQuery(std::string const &windowId, AlertSqlProducer &sqlProducer, AlertConfig::CustomSql &sql, bool &sqlIsNew) { sqlIsNew = false; SingleRequest *oldRequest = getProperty(_originalQueries, windowId); if (oldRequest) { // The next if statement is a minor optimization, nothing more. // It would be safe to always copy these items back. if (oldRequest->newSql) { sqlProducer = oldRequest->sqlProducer; sql = oldRequest->sqlStatement; } sqlIsNew = oldRequest->newSql; } } bool StreamingAlertsRequest::addFinalQuery(std::string windowId, bool wasOriginalQuery) { XmlNode &windowNode = _results[DATA][windowId]; SingleRequest *oldRequest = getProperty(_originalQueries, windowId); if (oldRequest) { if (wasOriginalQuery) { oldRequest->keep = true; } else { windowNode.clear(); } } // Add these items here because this is the place where we know that // every window will be. If we did this when were were retrieving data // ourselves, we'd also have to do when we call addData(). windowNode.name = WINDOW; windowNode.properties[ID] = windowId; return !windowNode.orderedChildren.empty(); } void StreamingAlertsRequest::trimResponse() { for (RequestMap::const_iterator it = _originalQueries.begin(); it != _originalQueries.end(); it++) { std::string const &windowId = it->first; if (!it->second.keep) { _results["DATA"].namedChildren.erase(windowId); } } } void StreamingAlertsRequest::addData(std::string windowId, XmlNode const &singleAlert) { XmlNode *windowNode = getProperty(_results["DATA"].namedChildren, windowId); if (windowNode && windowNode->orderedChildren.empty()) { // We don't add the data if we already have data. This is typically the // historical data, which we don't need any more because we have real- // time data. We don't allow new windows now. The headers we // initialized in addFinalQuery. If the window wasn't named in a // call to addFinalQuery, then the headers were never generated. (*windowNode)[-1] = singleAlert; } } // Finally get the response in XML for the client. XmlNode const &StreamingAlertsRequest::getResponse() const { return _results; } // We allow a modifiable version for effeciency. We don't want to copy // this node if possible. XmlNode &StreamingAlertsRequest::getResponse() { return _results; } TimeVal::Microseconds StreamingAlertsRequest::getQueryTime() const { return _queryTime; } int StreamingAlertsRequest::getWindowCount() const { // This is the number of queries we had to do. That might have // changed later. Look at the original request, not the current results. // We include queries that were skipped because there was no new data. // We used to not count those, but it made for some misleading statistics. return _originalQueries.size(); } bool StreamingAlertsRequest::getMaxedOut() const { return _maxedOut; } AlertId StreamingAlertsRequest::getLastId() const { return _maxAlertId; } ///////////////////////////////////////////////////////////////////// // StreamingAlertsThread ///////////////////////////////////////////////////////////////////// StreamingAlertsThread::StreamingAlertsThread(RequestListener *returnPath, int returnId, std::string databaseName) : ThreadClass("StreamingAlertsThread " + pointerToString(this)), _returnPath(returnPath), _returnId(returnId), _masterDatabase(false, getName() + " Master"), _database(databaseName, getName() + " Read Only"), _incoming(getName()) { startThread(); } StreamingAlertsThread::~StreamingAlertsThread() { // Realistically we should have a way to abort the database request. // If we ever destroy these, it's because we're reconfiguring on the fly, // and we might want to stop using one particular database server. Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void StreamingAlertsThread::requestData(StreamingAlertsRequest *request) { request->callbackId = mtRequestData; _incoming.newRequest(request); } void StreamingAlertsThread::debugReport() { Request *request = new Request(NULL); request->callbackId = mtDebugReport; _incoming.newRequest(request); } static const std::string S_mtRequestData = "mtRequestData"; static const std::string S_DeleteSocketThread="DeleteSocketThread::callbackId"; void StreamingAlertsThread::threadFunction() { // These are strictly used for debugging. SocketInfo *lastSocket = NULL; bool lastSocketDeleted = true; TimeVal lastCompleteTime; ThreadMonitor &m = ThreadMonitor::find(); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtRequestData: { m.increment(S_mtRequestData); lastSocket = current->getSocketInfo(); lastSocketDeleted = false; StreamingAlertsRequest *request = dynamic_cast(current); request->findAlerts(_masterDatabase, _database, _lastIdCache); request->callbackId = _returnId; _returnPath->newRequest(request); current = NULL; lastCompleteTime.currentTime(); break; } case mtDebugReport: { TclList msg; msg<<"StreamingAlertsThread.C" <<"debugReport()" <getSocketInfo()) { lastSocketDeleted = true; TclList msg; msg<<"StreamingAlertsThread.C" <<"deleting_recent_socket" <<(TimeVal(true).asMicroseconds() - lastCompleteTime.asMicroseconds()); } break; } case mtQuit: delete current; return; } delete current; } _incoming.waitForRequest(); } }