#include #include #include "SplitList.h" #include "../shared/DatabaseForThread.h" #include "../shared/MiscSQL.h" #include "CandleClient.h" #include "../shared/GlobalConfigFile.h" #include "../shared/MarketHours.h" #include "../shared/MiscSQL.h" #include "../shared/SimpleLogFile.h" #include "OneMinuteCandles.h" ///////////////////////////////////////////////////////////////////// // OneMinuteCandles::Data ///////////////////////////////////////////////////////////////////// TclList OneMinuteCandles::Data::debugDump() const { TclList result; result << "databaseCutoff" << ctimeString(databaseCutoff); TclList startTimeList; startTimeList << byStartTime.size(); if (byStartTime.size() > 0) { startTimeList << ctimeString(byStartTime.begin()->first); if (byStartTime.size() > 1) startTimeList << ctimeString(byStartTime.rbegin()->first); } result << "byStartTime" << startTimeList; return result; } ///////////////////////////////////////////////////////////////////// // OneMinuteCandles ///////////////////////////////////////////////////////////////////// TclList OneMinuteCandles::debugDump() const { TclList result = CandleDataNode::debugDump(); result << "_symbol" << _symbol << "_data" << _data.readLock()->debugDump(); // These are usually only accessed in the data node thread, but for a debug dump this is okay. result << "_lastVolume" << _lastVolume << "_lastStartTime" << ctimeString(_lastStartTime); result << "_databaseCache" << _databaseCache.debugDump(); return result; } std::string const &OneMinuteCandles::getCacheSize() { static std::string result = []() -> std::string { // This code will run the first time someone calls getCacheSize(). g++ // automatically makes this thread safe. // Default is off. auto size = getConfigItem("one_minute_candle_cache_size", 0); if (size == -1) size = 350 * 60; // Say -1 to use the recommended cache size. TclList msg; msg << FLF << "OneMinuteCandles::getCacheSize()" << size; sendToLogFile(msg); return (size <= 0) ? std::string() : ntoa(size); }(); return result; } OneMinuteCandles::CachedCandles OneMinuteCandles::getCacheFromDatabase(std::string const &symbol) { StopWatch stopWatch; assert(!cacheDisabled()); const SplitList splitList(symbol); std::map candles; const auto runQuery = [&candles, &splitList](std::string const &sql) { for (MysqlResultRef result = DatabaseForThread(DatabaseWithRetry::CANDLES) ->tryQueryUntilSuccess(sql, "One Minute Cache"); result->rowIsValid(); result->nextRow()) { time_t startTime = mysqlToTimeT(result->getStringField(0)); SingleCandle &candle = candles[startTime]; candle.open = result->getDoubleField(1, 0.0); candle.high = result->getDoubleField(2, 0.0); candle.low = result->getDoubleField(3, 0.0); candle.close = result->getDoubleField(4, 0.0); candle.volume = result->getIntegerField(5, 0); splitList.fixCandle(startTime, candle); } }; const std::string marketHoursSql = "SELECT start_time,open,high,low,close,volume FROM bars_1m " "WHERE symbol = \"" + mysqlEscapeString(symbol) + "\" ORDER BY start_time DESC LIMIT " + getCacheSize(); runQuery(marketHoursSql); if (!candles.empty()) { const std::string prePostSql = "SELECT start_time,open,high,low,close,volume FROM bars_pre_post " "WHERE symbol = '" + mysqlEscapeString(symbol) + "' AND start_time > '" + timeTToMysql(candles.begin()->first) + "'"; runQuery(prePostSql); } // sendToLogFile(TclList()<; result->reserve(candles.size()); for (auto it = candles.begin(); it != candles.end(); it++) result->push_back({it->first, it->second}); sendToLogFile(TclList() << FLF << "symbol" << symbol << "row count" << result->size() << "oldestTime" << (candles.empty() ? "NONE" : ctimeString(candles.begin()->first)) << "time in µs" << stopWatch.getMicroSeconds()); return result; } void OneMinuteCandles::resetNow() { struct ResetInfo { std::string symbol; size_t size; }; static std::vector allResetInfo; static std::set allDatabaseCutoffs; if (allResetInfo.empty()) getManager().getDataNodeManager().addToQueue([]() { TclList msg; msg<byStartTime.size(); allDatabaseCutoffs.insert(data->databaseCutoff); data->databaseCutoff = std::max(time(NULL), data->databaseCutoff); data->byStartTime.clear(); } void OneMinuteCandles::onBroadcast(BroadcastMessage &message, int msgId) { resetNow(); } void OneMinuteCandles::onWakeup(int msgId) { if (!_tosData->getValid()) return; TosData const &last = _tosData->getLast(); if (!last.newPrint) return; // This is not quite right. We copied this logic from TI Pro. In fact, // we would like to see some old prints. We'd like to update an old candle // if we see an old print. "updatesLast" covers a lot of strange prints. // if we remove this without replacing it with something better, we'd get // a lot of crap. // :( We have a lot of logic in this program to recompute exactly the // right amount after receiving a bad print. And I tested it thoroughly // with our debug interface producing fake data. But it seems like we'll // never get these late prints because of this test. And we can't just get // rid of this test. We'd have to understand the rules better about what // should and shouldn't be ignored, and we'd probably have to send more // data from the market data proxy. TODO? if (!(last.updatesLast || last.formT)) return; const double price = last.price; // If somehow we missed a print then the second half of this max() will // show us the missing volume and will assign it to the current candle, // as if it were part of this print. Most of the time both sides of this // max() function call will say the same thing. const double size = std::max(last.size, last.volume - _lastVolume); _lastVolume = last.volume; _lastStartTime = last.time / 60 * 60; auto data = _data.writeLock(); data->byStartTime[_lastStartTime].add(price, size); data->epochCounter.addEpoch(_lastStartTime); } const std::string OneMinuteCandles::RESET_CHANNEL = "OneMinuteCandles.Reset"; void OneMinuteCandles::classInit() { // Assume just one data node thread. static bool initialized = false; if (!initialized) { // This will go off at approximately midnight each night. (If you don't // reboot this will be an hour off after daylight savings time changes.) // We don't want to erase our cache before the new data is in the database, // around 5pm, or we'd be missing some data. And we don't want to hold // onto the cache after the database has been updated for splits, about // 4:20am, or we'd have incorrect data. Pretty much any time between those // two times is an acceptable time to flush the cache. int period = 24 * 60 * 60 * 1000; // One day in milliseconds. TimeVal initial(midnight(time(NULL))); initial.addMilliseconds(period); getManager().getDataNodeManager().getTimerThread().requestPeriodicBroadcast(RESET_CHANNEL, period, initial); initialized = true; sendToLogFile(TclList() << FLF << "Scheduled first reset" << initial); } } time_t OneMinuteCandles::initialDatabaseCutoff() { if (getConfigItem("no_streaming_data") == "1") // Never use the local cache because we do not have the data to fill it // in. Always use the database. Presumably this is going through a // candle server, and that server is mixing live data with the database // for us. return std::numeric_limits::max(); else // We start building a cached copy from live data now. Check the database // for anything before now. This is an imperfect system. The database // typically only updates at the end of the day. So if you want data for a // stock, you should start listening early in the morning. return strtolDefault(getConfigItem("debug_database_cutoff"), time(NULL)); } OneMinuteCandles::OneMinuteCandles(std::string const &key, std::string const &symbol) : CandleDataNode(key), _tosData(NULL), _symbol(symbol), _lastVolume(0), _lastStartTime(0), _databaseCache(!cacheDisabled(), [symbol]() { return getCacheFromDatabase(symbol); }) { getManager().getDataNodeManager().addToQueue([=]() { _data.setWriteThread(); classInit(); registerForBroadcast(RESET_CHANNEL, 0); addAutoLink(GenericTosDataNode::find(this, 0, _tosData, _symbol)); }); } /* The original getFromDatabase() was limited to the bars_1m table. bars_1m * does not contain form_t (i.e. pre and post market) data. We decided to add * the form_t data into a different table, bars_pre_post. That could have been * just a new boolean field. We decided to create a separate table in large * part to avoid breaking (and fixing and retesting) a lot of old code. * * Note that tikiller does not think about pre and post market in this way. * The user says which times he cares about. By default that is from the open * to the close. But the user can adjust the start and end times any way he * pleases. * * Note: For simplicity tikiller doesn't (currently) consider half days at all. * The process that creates bars_1m and bars_pre_post does. Trades from * 12:30 pm Pacific time will normally go into bars_1m, but on half days these * trades will go into bars_pre_post. * * The original code would create queries like these: * SELECT @split := COALESCE(EXP(SUM(LOG(new_shares/old_shares))),1) FROM splits WHERE symbol='MSFT' AND date > '2016-02-19' * SELECT @open := min(start_time), @close := max(start_time), min(low) / @split AS LOW, max(high) / @split AS HIGH, ROUND(sum(volume) * @split) AS VOLUME FROM bars_1m WHERE start_time >= '2016-02-19 11:30:00' AND start_time < '2016-02-19 11:45:00' AND symbol = 'MSFT' * SELECT open / @split AS open FROM bars_1m WHERE start_time = @open AND symbol = 'MSFT' * SELECT close / @split AS close FROM bars_1m WHERE start_time = @close AND symbol = 'MSFT' * * * The new code, which handles bars_pre_post, will create queries like these: * [First query is unchanged.] * SELECT @open := min(start_time), @close := max(start_time), min(low) / @split AS LOW, max(high) / @split AS HIGH, ROUND(sum(volume) * @split) AS VOLUME FROM ((SELECT start_time, low, high, volume FROM bars_1m WHERE start_time >= '2016-02-19 11:30:00' AND start_time < '2016-02-19 11:45:00' AND symbol = 'MSFT') UNION (SELECT start_time, low, high, volume FROM bars_pre_post WHERE start_time >= '2016-02-19 11:30:00' AND start_time < '2016-02-19 11:45:00' AND symbol = 'MSFT')) AS u; * (SELECT open / @split AS open FROM bars_1m WHERE start_time = @open AND symbol = 'MSFT') UNION (SELECT open / @split AS open FROM bars_pre_post WHERE start_time = @open AND symbol = 'MSFT') LIMIT 1; * (SELECT close / @split AS close FROM bars_1m WHERE start_time = @close AND symbol = 'MSFT') UNION (SELECT close / @split AS close FROM bars_pre_post WHERE start_time = @close AND symbol = 'MSFT') LIMIT 1; * * * I also tried another version. This was a stored procedure. I used a stored * procedure so I could use an if statement. If no bars are found in one table * then check the other table. That was a mistake. The user could create bars * that are partially normal and partially form_t. So we always need to check * both tables. * * DROP PROCEDURE IF EXISTS GetIntraDayCandle; * DELIMITER // * CREATE PROCEDURE GetIntraDayCandle(IN for_symbol VARCHAR(30), IN candle_start DATETIME, IN candle_end DATETIME) * BEGIN * DECLARE split DOUBLE; * DECLARE open_time, close_time DATETIME; * DECLARE open_price, close_price, high_price, low_price DOUBLE; * DECLARE total_volume BIGINT; * SELECT COALESCE(EXP(SUM(LOG(new_shares/old_shares))),1) FROM splits WHERE symbol=for_symbol AND date > DATE(candle_start) INTO split; * SELECT min(start_time), max(start_time), min(low) / split, max(high) / split, ROUND(sum(volume) * split) FROM bars_1m WHERE start_time >= candle_start AND start_time < candle_end AND symbol = for_symbol INTO open_time, close_time, low_price, high_price, total_volume; * IF open_time IS NULL THEN * SELECT min(start_time), max(start_time), min(low) / split, max(high) / split, ROUND(sum(volume) * split) FROM bars_pre_post WHERE start_time >= candle_start AND start_time < candle_end AND symbol = for_symbol INTO open_time, close_time, low_price, high_price, total_volume; * SELECT open / split AS open FROM bars_pre_post WHERE start_time = open_time AND symbol = for_symbol INTO open_price; * SELECT close / split AS close FROM bars_pre_post WHERE start_time = close_time AND symbol = for_symbol INTO close_price; * ELSE * SELECT open / split AS open FROM bars_1m WHERE start_time = open_time AND symbol = for_symbol INTO open_price; * SELECT close / split AS close FROM bars_1m WHERE start_time = close_time AND symbol = for_symbol INTO close_price; * END IF; * SELECT open_price AS open, high_price as high, low_price as low, close_price as close, total_volume as volume; * END // * DELIMITER ; * * The input to GetIntraDayCandle is a symbol, a start time, and an end time. * * Start time and end time should be the exact beginning and end of the candle. * For example, * GetIntraDayCandle("MSFT", "2016-02-19 06:30:00", "2016-02-19 06:35:00") * will get the first 5 minute candle of the regular trading day. * * The output is one row. It looks like you executed a single query. That * will return open, high, low, close, volume. * * * All three versions assume the start time and end time are on the same day. * If that's not true the splits might not work right. */ OneMinuteCandles::CachedCandles OneMinuteCandles::getFromDatabase(std::string const &symbol, time_t start, time_t end) { StopWatch stopWatch; const SplitList splitList(symbol); std::map candles; const auto runQuery = [&candles, &splitList](std::string const &sql) { for (MysqlResultRef result = DatabaseForThread(DatabaseWithRetry::CANDLES) ->tryQueryUntilSuccess(sql, "One Minute Cache"); result->rowIsValid(); result->nextRow()) { time_t startTime = mysqlToTimeT(result->getStringField(0)); SingleCandle &candle = candles[startTime]; candle.open = result->getDoubleField(1, 0.0); candle.high = result->getDoubleField(2, 0.0); candle.low = result->getDoubleField(3, 0.0); candle.close = result->getDoubleField(4, 0.0); candle.volume = result->getIntegerField(5, 0); splitList.fixCandle(startTime, candle); } }; const std::string marketHoursSql = "SELECT start_time,open,high,low,close,volume FROM bars_1m " "WHERE symbol = \"" + mysqlEscapeString(symbol) + "\" AND start_time BETWEEN FROM_UNIXTIME(" + std::to_string(start) + ") AND " + "FROM_UNIXTIME(" + std::to_string(end) + ") " + "ORDER BY start_time DESC"; runQuery(marketHoursSql); if (!candles.empty()) { const std::string prePostSql = "SELECT start_time,open,high,low,close,volume FROM bars_pre_post " "WHERE symbol = '" + mysqlEscapeString(symbol) + "' AND start_time > '" + timeTToMysql(candles.begin()->first) + "'"; runQuery(prePostSql); } // sendToLogFile(TclList()<; result->reserve(candles.size()); for (auto it = candles.begin(); it != candles.end(); it++) result->push_back({it->first, it->second}); sendToLogFile(TclList() << FLF << "symbol" << symbol << "row count" << result->size() << "oldestTime" << (candles.empty() ? "NONE" : ctimeString(candles.begin()->first)) << "time in µs" << stopWatch.getMicroSeconds()); return result; } SingleCandle OneMinuteCandles::getFromMemory(Data const &data, time_t start, time_t end) { SingleCandle result; for (auto it = data.byStartTime.lower_bound(start), noMoreData = data.byStartTime.end(); (it != noMoreData) && (it->first < end); it++) result += it->second; // sendToLogFile(TclList()<epochCounter.restartAt(epoch); return SingleCandle::ByStartTime(data->byStartTime.lower_bound(startAt), data->byStartTime.end()); } SingleCandle::ByStartTime OneMinuteCandles::threadSafeGetAllSince(EpochCounter::Epoch epoch, time_t startAt, time_t endBefore) { auto data = _data.readLock(); startAt = std::min(startAt, data->epochCounter.restartAt(epoch)); return SingleCandle::ByStartTime(data->byStartTime.lower_bound(startAt), data->byStartTime.lower_bound(endBefore)); } time_t OneMinuteCandles::threadSafeRestartAt(EpochCounter::Epoch epoch) { return _data.readLock()->epochCounter.restartAt(epoch); } time_t OneMinuteCandles::oldestFastTime() { time_t result = _data.readLock()->databaseCutoff; CachedCandles cached = _databaseCache.get(); if (!cached->empty()) result = std::min(result, cached->rbegin()->startTime); return result; } void OneMinuteCandles::verifyCachedCandles(CachedCandles const &cachedCandles) { if (!cachedCandles) return; time_t last; minimize(last); for (SingleCandleWithTime const &item : *cachedCandles) { assert(item.startTime > last); last = item.startTime; } } static const std::string s_startTimeListFirst = "OneMinuteCandles::startTimeListFirst"; OneMinuteCandles::StartTimeList OneMinuteCandles::startTimeList(time_t startBefore) { ThreadMonitor::SetState tm(s_startTimeListFirst); tm.increment(s_startTimeListFirst); std::vector *times = new std::vector; { auto data = _data.readLock(); auto it = data->byStartTime.lower_bound(startBefore); const auto stop = data->byStartTime.begin(); if (it != stop) while (true) { it--; times->push_back(it->first); if (it == stop) break; } } // CachedCandles candles = _databaseCache.get(); // sendToLogFile(TclList()<size():-1)); if (times->empty()) { delete times; return StartTimeList(startBefore, _databaseCache.get()); } else return StartTimeList(times, _databaseCache.get()); } static const std::string s_startTimeListMore = "OneMinuteCandles::startTimeListMore"; OneMinuteCandles::StartTimeList OneMinuteCandles::startTimeList(StartTimeList const &previous) { if (!previous.moreData()) return StartTimeList(); // We're already at the end. // Assume the first call to startTimeList() exhausted anything that's // in memory, so we go directly to the database. ThreadMonitor::SetState tm(s_startTimeListMore); tm.increment(s_startTimeListMore); std::vector *times = new std::vector; std::string sql = "SELECT start_time FROM bars_1m WHERE symbol = '" + mysqlEscapeString(_symbol) + "' AND start_time <'" + timeTToMysql(previous.restartBefore()) + "' ORDER BY start_time DESC LIMIT 500"; MysqlResultRef databaseResult = DatabaseForThread(DatabaseWithRetry::CANDLES) ->tryQueryUntilSuccess(sql, s_startTimeListMore); times->reserve(databaseResult->numRows()); for (; databaseResult->rowIsValid(); databaseResult->nextRow()) times->push_back(mysqlToTimeT(databaseResult->getStringField(0))); if (!times->empty()) { std::string sqlPrePost = "SELECT start_time FROM bars_pre_post WHERE symbol = '" + mysqlEscapeString(_symbol) + "' AND start_time <'" + timeTToMysql(previous.restartBefore()) + "' AND start_time >'" + timeTToMysql(*times->rbegin()) + '\''; for (databaseResult = DatabaseForThread(DatabaseWithRetry::CANDLES) ->tryQueryUntilSuccess(sqlPrePost, s_startTimeListMore); databaseResult->rowIsValid(); databaseResult->nextRow()) times->push_back(mysqlToTimeT(databaseResult->getStringField(0))); } std::sort(times->begin(), times->end(), [](time_t a, time_t b) { return b < a; }); // Oldest/largest first. return StartTimeList(times); } const std::string s_threadSafeGet = "OneMinuteCandles::threadSafeGet"; const std::string s_data = "OMC_data"; const std::string s_databaseCache = "OMC_cache"; const std::string s_database = "OMC_database"; // TODO I think we're processing the data in the wrong order. So += // won't join the parts of a candle right. void OneMinuteCandles::threadSafeGet(AllRowTimes const &requests, SingleCandle::ByStartTime &destination) { // The following comment was the original plan. TODO clean up the comment // now that we've done this more or less as stated! // TODO if the candle starts before the memory ends, then grab it // from memory. If the candle ends after memory ends, add it to the // to do list. A candle can strattle the line, so we need to get // some data now and add it to the list to get more data later. // // Insert step 2, the new cache. If there is overlap between memory // and the cache, avoid what's in the cache. Don't start your searches // from _cache.begin(). Make sure you start from the first item after // what's in memory. // // Find the end of the new cache. If the cache is completely empty, or // if the last time is before the end of what's in memory, just use the // time from the end of memory. // // Walk through the list of remaining requests much like we did with // the live memory data. If at least part of the candle happens before // the cutoff, handle it from the cache. If at least part of the candle // happens after the cutoff, handle it from the database. Some candles // might need both. // // Note, when we read from the first source, we can copy the data into // the destination. But when we read from the second or third source, // we need to preAppend() into the destination. We were always sloppy // about this in the past. We assumed that a candle would never cross // a boundary between memory and the database. That limits us if we // ever want to fill the database during the day, in case a server // restarts mid day. But this is essential when we add work with the // new database cache. The cache size is somewhat arbitrary so we are // likely to break some candles in the middle. // // Note that we are working with the most recent data first, and then // going backwards toward the older data. That's convenient here // because the most recent data is in memory, and we only go to the // database after we've done all we can with memory. But that's the // opposite of the case += was designed for. So we use // SingleCandle::preAppend() instead. ThreadMonitor::SetState tm(s_threadSafeGet); tm.increment(s_threadSafeGet); AllRowTimes externalRequests; externalRequests.reserve(requests.size()); int count_data = 0; time_t databaseCutoff; { auto data = _data.readLock(); databaseCutoff = data->databaseCutoff; for (auto &request : requests) { if (request.end > databaseCutoff) { // At least part of the candle was handled in memory. destination[request.start] = getFromMemory(*data, request.start, request.end); count_data++; } if (request.start <= databaseCutoff) // At least part of the candle was not handled in memory. externalRequests.push_back(request); } } tm.increment(s_data, count_data); if (externalRequests.empty()) return; if (CandleClient *instance = CandleClient::instance()) { const std::string bytes = instance->getIntraday(_symbol, externalRequests); SingleCandle::unmarshal(bytes, destination); // TODO check for and report any errors. } else { int count_cache = 0; int count_database = 0; CachedCandles cache = _databaseCache.get(); // sendToLogFile(TclList()<size():-1)); time_t cacheCutoff; // The oldest time covered by the cache. if (cache->empty()) maximize(cacheCutoff); else cacheCutoff = cache->begin()->startTime; if (cacheCutoff > databaseCutoff) cacheCutoff = databaseCutoff; // Be careful to avoid duplication. Each one minute candle should come // from exactly one source. However, an n minute candle might cross a // boundary and be covered by 2 or even all 3 sources. // // If the start time of the one minute candle >= databaseCutoff // then get the candle from _data, the part we have to keep in memory. // Else if the start time of the one minute candle >= cacheCutoff // then get the candle from _databaseCache, which is totally optional. // Else get the candle from the database. // // The names are getting a little out of date. databaseCutoff refers // to anything that we could expect to find in the database vs what we // know we have to keep in memory, particularly what we store in _data. // _databaseCache is completely optional and we flush it all the time. // _databaseCache and the actual database are grouped on the same side // of databaseCutoff, and _data is on the other side. const auto cacheBegin = cache->begin(); const auto cacheEnd = std::lower_bound(cache->begin(), cache->end(), databaseCutoff, SingleCandleWithTime::Comp()); CachedCandles tempLoadedCandles; time_t earliestTime = externalRequests.begin()->start; bool first = true; for (RowTimes const &request : externalRequests) { if (request.end > cacheCutoff) { // At least part of this request was handled by the cache. count_cache++; SingleCandle candle; for (auto it = std::lower_bound(cacheBegin, cacheEnd, request.start, SingleCandleWithTime::Comp()); it != cacheEnd; it++) { if (it->startTime >= request.end) break; // We read the cache from oldest to most recent. It's // hard to avoid that. But it's the opposite direction // from the rest of this method. So we build up the // candle variable from the cache in chronological order. // That's why we use += here. candle += it->candle; } destination[request.start].preAppend(candle); } if (request.start <= cacheCutoff) { count_database++; if (first) { first = false; tempLoadedCandles = getFromDatabase(_symbol, earliestTime, databaseCutoff); } SingleCandle candle; for (auto it = std::lower_bound(tempLoadedCandles->begin(), tempLoadedCandles->end(), request.start, SingleCandleWithTime::Comp()); it != tempLoadedCandles->end(); it++) { if (it->startTime >= request.end) break; // We read the cache from oldest to most recent. It's // hard to avoid that. But it's the opposite direction // from the rest of this method. So we build up the // candle variable from the cache in chronological order. // That's why we use += here. candle += it->candle; } destination[request.start].preAppend(candle); } } tm.increment(s_databaseCache, count_cache); tm.increment(s_database, count_database); } } std::string OneMinuteCandles::marshalTodaysCandles() { std::string result; time_t startTime = midnight(time(NULL)); auto data = _data.readLock(); for (auto it = data->byStartTime.lower_bound(startTime), end = data->byStartTime.end(); it != end; it++) it->second.marshal(it->first, result); return result; } OneMinuteCandles *OneMinuteCandles::find(std::string const &symbol) { const std::string key = makeKey(symbol); return Manager::getInstance().find(key, [&]() { return new OneMinuteCandles(key, symbol); }); }