#include #include "../shared/ThreadMonitor.h" #include "../shared/MarketHours.h" #include "../shared/MiscSQL.h" #include "../shared/DatabaseSupport.h" #include "../shared/SimpleLogFile.h" #include "../shared/DatabaseForThread.h" #include "../shared/GlobalConfigFile.h" #include "../generate_alerts/misc_framework/AccumulateInsert.h" #include "CandlesToDatabase.h" CandlesToDatabase::CandlesToDatabase(DataNodeThread &dataNodeThread, IContainerThread *container) : ForeverThreadUser(container?container: IContainerThread::create("CandlesToDatabase")), _dataNodeThread(dataNodeThread), _nextWakeTime(0), _lastEndTime(0), _lastEpoch(0), _nextAutoReloadTime(NEVER), _changeInProgress(false) { start(); if (getConfigItem("candles_to_database") == "1") addFromDatabase(); } void CandlesToDatabase::beforeSleep(IBeforeSleepCallbacks &callbacks) { callbacks.wakeAtTime(std::min(_nextWakeTime, _nextAutoReloadTime)); } void CandlesToDatabase::awake(std::set< int > const &woken) { ThreadMonitor::SetState tm("CandlesToDatabase::awake"); const time_t now = time(NULL); if (now < _nextWakeTime) return; if (now >= _nextAutoReloadTime) { // Black out the auto reload timer. addFromDatabase() will send messages // between threads. We don't know how long it will take to complete. And // we might wake up multiple times before that completes. We don't want to // keep calling addFromDatabase() over and over while it is already // running. _nextAutoReloadTime = NEVER; addFromDatabase(); } const time_t currentCandleStart = midnight(now) + secondOfTheDay(now) / 60 * 60; // Wake up 30 seconds after the current candle ends. _nextWakeTime = currentCandleStart + 90; AccumulateInsert normalBars("REPLACE INTO bars_1m (symbol, start_time, open, high, low, close, volume)"); AccumulateInsert formTBars("REPLACE INTO bars_pre_post (symbol, start_time, open, high, low, close, volume)"); DatabaseWithRetry &database = *DatabaseForThread(DatabaseWithRetry::CANDLES_MASTER); const EpochCounter::Epoch nextEpoch = EpochCounter::getEpoch(); for (auto kvp : _dataProviders) { const std::string symbol = kvp.first; OneMinuteCandles *const provider = kvp.second; SingleCandle::ByStartTime candles = provider->threadSafeGetAllSince(_lastEpoch, _lastEndTime, currentCandleStart); for (auto candleInfo : candles) { const time_t candleStart = candleInfo.first; const SingleCandle candle = candleInfo.second; const int startTime = secondOfTheDay(candleStart); const bool preOrPost = (startTime < MARKET_HOURS_OPEN) || (startTime + MARKET_HOURS_MINUTE >= MARKET_HOURS_CLOSE); AccumulateInsert &accumulateInsert = preOrPost?formTBars:normalBars; accumulateInsert.add("('" + mysqlEscapeString(symbol) + "', '" + timeTToMysql(candleStart) + "', " + ntoa(candle.open) + ", " + ntoa(candle.high) + ", " + ntoa(candle.low) + ", " + ntoa(candle.close) + ", " + ntoa(candle.volume) + ")"); if (accumulateInsert.full()) { // I've never actually seen "write partial" in the logs. But this is // still a good template to copy. database.tryQueryUntilSuccess(accumulateInsert.get(), "write candles"); tm.increment("write partial"); } } } const auto push = [&database, &tm](AccumulateInsert accumulateInsert) { if (!accumulateInsert.empty()) { database.tryQueryUntilSuccess(accumulateInsert.get(), "write candles"); tm.increment("write last"); } }; push(formTBars); push(normalBars); /* TclList msg; msg< CandlesToDatabase::getSymbolList() { std::vector< std::string > result; const std::string sql = "SELECT DISTINCT d_symbol FROM alerts_daily " "WHERE list_exch NOT IN ('CAT', 'CAV', '$NDX') " "AND date >= (SELECT MIN(date) FROM " "(SELECT DISTINCT date FROM alerts_daily " "ORDER BY date DESC LIMIT 3) as R)"; DatabaseWithRetry &database = *DatabaseForThread(DatabaseWithRetry::SLAVE); MysqlResultRef r = database.tryQueryUntilSuccess(sql, "getSymbolList()"); for (; r->rowIsValid(); r->nextRow()) result.push_back(r->getStringField(0)); return result; } void CandlesToDatabase::add(std::vector< std::string > toAdd) { if (_changeInProgress) // This is oversimplified. We could make an organized way to handle // overlapping requests. But that seems complicated and more than we need // right now. sendToLogFile(TclList()< newDataProviders; for (std::string const &symbol : toAdd) OneMinuteCandles::find(newDataProviders[symbol], symbol); getContainer()->addLambdaToQueue ([this, newDataProviders]() { TclList msg; msg<