#include #include #include #include "../shared/MiscSupport.h" #include "../shared/ThreadMonitor.h" #include "GridReaderBase.h" using namespace GridInstanceData; ///////////////////////////////////////////////////////////////////// // GridPrototype ///////////////////////////////////////////////////////////////////// GridPrototype::GridPrototype(int rowCount, int colCount, CandleTimer::Ref candleTimer, bool packed) : _rowCount(rowCount), _colCount(colCount), _candleTimer(candleTimer), _packed(packed) { assert((rowCount > 0) && (colCount > 0)); _formulae.resize(rowCount * colCount); } GridPrototype::GridPrototype(GridPrototype const &original, CandleTimer::Ref candleTimer) : _rowCount(original._rowCount), _colCount(original._colCount), _candleTimer(candleTimer), _packed(original._packed), _columns(original._columns), _formulae(original._formulae) { } int GridPrototype::findColumn(std::string const &name) const { return getProperty(_columns, name, -1); } std::string GridPrototype::getFormula(int row, int col) const { // For simplicity, any request that's out of bounds will return an // empty string. This will cause an error down the road, but all formula // errors will be caught and converted to a NaN. Arguably this could and // probably should be an assertion failure. if ((row < 0) || (col < 0) || (col >= _colCount)) return ""; // The first n-1 rows are used exactly as is. The table can grow as much // as much as needed by copying the last row over and over. if (row >= _rowCount) row = _rowCount - 1; const int index = row * _colCount + col; return _formulae[index]; } void GridPrototype::getColumns(std::vector< std::string > &out) const { out.clear(); out.resize(_colCount); for (std::map< std::string, int >::const_iterator it = _columns.begin(); it != _columns.end(); it++) { const int index = it->second; if ((index >= 0) && (index < _colCount)) out[index] = it->first; } } void GridPrototype::setColumnName(const std::string &name, int col) { assert(_columns.count(name) == 0); assert((col >= 0) && (col < _colCount)); _columns[name] = col; } void GridPrototype::setFormula(int row, int col, std::string formula, bool safe) { if (safe) { if (formula.find('[') != std::string::npos) { static const std::string ERROR_MSG = "[error {Square brackets are not allowed.}]"; formula = ERROR_MSG; } } assert((row >= 0) && (col >= 0)); assert(row < _rowCount); assert(col < _colCount); const int index = row * _colCount + col; _formulae[index] = formula; } void GridPrototype::setFormula(int row, const std::string &col, const std::string &formula) { const int columnIndex = findColumn(col); setFormula(row, columnIndex, formula); } void GridPrototype::setFormulaDown(int firstRow, const std::string &col, const std::string &formula) { for (int row = firstRow; row < _rowCount; row++) setFormula(row, col, formula); } std::string GridPrototype::debugDump() const { std::vector< std::string > names; names.resize(_colCount); TclList result; result<debugDump(); TclList row; for (std::map< std::string, int >::const_iterator it = _columns.begin(); it != _columns.end(); it++) names[it->second] = it-> first; for (std::vector< std::string >::const_iterator it = names.begin(); it != names.end(); it++) row<<*it; result<holdingWriteLock()?NULL:&gridInstance->_mutex), _grid(inProgress() ?gridInstance->_values.inProgress() :gridInstance->_values.committed()) { if (_mutex) pthread_mutex_lock(_mutex); } GridInstance::WhichData::~WhichData() { if (_mutex) pthread_mutex_unlock(_mutex); } inline GridInstanceData::Use GridInstance::WhichData::use() const { return inProgress() ?GridInstanceData::Use::InProgress :GridInstanceData::Use::Committed; } ///////////////////////////////////////////////////////////////////// // GridInstance ///////////////////////////////////////////////////////////////////// int GridInstance::getCalculationRowPacked() const { WhichData whichData(this); return whichData.grid().nextRow(); } int GridInstance::getCalculationRowPossible(int packedOffset) const { WhichData whichData(this); if (packedOffset == 0) // This constantly updates, even if we are skipping rows. return _values.completedRowCount(whichData.use()); const int nextRow = whichData.grid().nextRow(); int packed = nextRow - packedOffset; return _values.packedToPossible(packed, whichData.use()); } double GridInstance::invalid() { return std::numeric_limits< double >::quiet_NaN(); } GridInstance::GridInstance(GridPrototypeRef const &prototype, GridDataProviderRef const &dataProvider) : _prototype(prototype), _dataProvider(dataProvider), _values(prototype->getPacked(), prototype->getWidth()), _epoch(0), _writeLocked(false) { assertFalse(pthread_mutex_init(&_mutex, NULL)); assertFalse(pthread_cond_init(&_writerCondition, NULL)); } double GridInstance::referencePacked(int row, int column) const { WhichData whichData(this); return whichData.grid().reference(row, column); } double GridInstance::referencePossible(int row, int column, Round round) const { WhichData whichData(this); row = _values.possibleToPacked(row, whichData.use(), round); return whichData.grid().reference(row, column); } double GridInstance::referencePacked(int row, const std::string &column) const { const int columnIndex = _prototype->findColumn(column); if (columnIndex < 0) return invalid(); else return referencePacked(row, columnIndex); } double GridInstance::referencePossible(int row, const std::string &column, Round round) const { const int columnIndex = _prototype->findColumn(column); if (columnIndex < 0) return invalid(); else return referencePossible(row, columnIndex, round); } double GridInstance::sum(int firstRow, int lastRow, int column, bool skipBadValues) const { WhichData whichData(this); return whichData.grid().sum(firstRow, lastRow, column, skipBadValues); } double GridInstance::sum(int firstRow, int lastRow, const std::string &column, bool skipBadValues) const { const int columnIndex = _prototype->findColumn(column); if (columnIndex < 0) return invalid(); else return sum(firstRow, lastRow, columnIndex, skipBadValues); } double GridInstance::count(int firstRow, int lastRow, int column) const { WhichData whichData(this); return whichData.grid().count(firstRow, lastRow, column); } double GridInstance::count(int firstRow, int lastRow, const std::string &column) const { const int columnIndex = _prototype->findColumn(column); if (columnIndex < 0) return invalid(); else return count(firstRow, lastRow, columnIndex); } double GridInstance::average(int firstRow, int lastRow, int column, bool skipBadValues) const { WhichData whichData(this); return whichData.grid().average(firstRow, lastRow, column, skipBadValues); } double GridInstance::average(int firstRow, int lastRow, const std::string &column, bool skipBadValues) const { const int columnIndex = _prototype->findColumn(column); if (columnIndex < 0) return invalid(); else return average(firstRow, lastRow, columnIndex, skipBadValues); } double GridInstance::max(int firstRow, int lastRow, int column, bool skipBadValues) const { WhichData whichData(this); return whichData.grid().max(firstRow, lastRow, column, skipBadValues); } double GridInstance::max(int firstRow, int lastRow, const std::string &column, bool skipBadValues) const { const int columnIndex = _prototype->findColumn(column); if (columnIndex < 0) return invalid(); else return max(firstRow, lastRow, columnIndex, skipBadValues); } double GridInstance::min(int firstRow, int lastRow, int column, bool skipBadValues) const { WhichData whichData(this); return whichData.grid().min(firstRow, lastRow, column, skipBadValues); } double GridInstance::min(int firstRow, int lastRow, const std::string &column, bool skipBadValues) const { const int columnIndex = _prototype->findColumn(column); if (columnIndex < 0) return invalid(); else return min(firstRow, lastRow, columnIndex, skipBadValues); } void GridInstance::getValuesAll(std::vector< double > &result, int firstRow, int lastRow, int column) const { result.clear(); WhichData whichData(this); whichData.grid().getValuesAll(result, firstRow, lastRow, column); } void GridInstance::getValuesAll(std::vector< double > &result, int firstRow, int lastRow, const std::string &column) const { const int columnIndex = _prototype->findColumn(column); if (columnIndex < 0) result.clear(); else getValuesAll(result, firstRow, lastRow, columnIndex); } void GridInstance::skipRow() { assert(holdingWriteLock()); _values.skipRow(); } void GridInstance::store(double value) { assert(holdingWriteLock()); _values.store(value); } GridInstance::~GridInstance() { assertFalse(pthread_cond_destroy(&_writerCondition)); assertFalse(pthread_mutex_destroy(&_mutex)); } int GridInstance::nextCol() { WhichData whichData(this); return whichData.grid().nextCol(); } int GridInstance::nextRow() { WhichData whichData(this); return whichData.grid().nextRow(); } std::string GridInstance::getNextFormula() const { WhichData whichData(this); return _prototype->getFormula(whichData.grid().nextRow(), whichData.grid().nextCol()); } void GridInstance::removeOldData() { assert(holdingWriteLock()); EpochCounter::Epoch newEpoch = EpochCounter::getEpoch(); deleteStartingAt(_dataProvider->restartAt(_epoch)); _epoch = newEpoch; } void GridInstance::deleteStartingAt(time_t time) { // Initially we look for the start time and the end time of the candles // we have. Only if time is within that range to we try to convert time // into a row number. If we didn't do this check, time could be some // unreasonable number, like 0 or MAXINT. That might cause any number of // problems if we tried to convert that to a row. (I.e. numeric overflow; a // large positive number becomes negative so less than does not work as // expected.) time_t startTime, endTime; CandleTimer::Ref candleTimer = _prototype->getCandleTimer(); const int completedRowCount = _values.completedRowCount(Use::InProgress); const int lastRowStartedPossible = (_values.inProgress().nextCol() > 0) ?completedRowCount:(completedRowCount-1); candleTimer->getTimes(lastRowStartedPossible, startTime, endTime); if (endTime <= time) { // The cut off time is after the end of the last row. So this is a no-op // and we can just return. Note that completedRowCount() includes rows // with real data, and rows that were skipped. An older version of this // code used the same computation, but the comments were wrong; they said // that we were only counting to the last row of real data. Then the code // went on to delete some of the skipped rows. That last step was // unnecessary. It caused a small hit to performance, but did not cause // any user visible bugs. return; } candleTimer->getTimes(0, startTime, endTime); int rowCount; if (startTime >= time) // Before the first candle. Again, we don't want to convert just any time // to a row number. This might cause an overflow or something like that. rowCount = 0; else rowCount = std::max(candleTimer->getTotalRowCount(time), 0); ThreadMonitor::find().increment("GridInstance::truncateRows"); _values.deleteExceptPossible(rowCount); } static std::string nanToBlank(double value) { if (!std::isfinite(value)) return ""; return ntoa(value); } std::string GridInstance::debugDump() const { TclList values; int completedRowCount; GridInstance *recentlyLocked = writeLock(); if (!holdingWriteLock()) { values<<"Unable to acquire lock"; completedRowCount = -1; } else { completedRowCount = _values.completedRowCount(Use::InProgress); PackableValues::DebugDump dump; _values.debugDump(dump); if (recentlyLocked) recentlyLocked->releaseWriteLock(); CandleTimer::Ref candleTimer = _prototype->getCandleTimer(); for (auto it = dump.possibleRowIndex.cbegin(); it != dump.possibleRowIndex.cend(); it++) { const int possibleRow = *it; TclList rowDump; time_t startTime, endTime; candleTimer->getTimes(possibleRow, startTime, endTime); rowDump<<(TclList()<restartAt(_epoch); if ((restartAt <= 0) || (restartAt >=5000000000L)) epoch<(this); result->_writerThread = pthread_self(); result->_writeLocked = true; pthread_mutex_unlock(&_mutex); return result; } /* Here's some test code for GridInstance::writeLock(). I ran this in the * ScriptExecutor. It creates a large grid and asks a large number * of threads to update that grid. <<<< 53517805 <<<< set channel [ti::get_current_channel] set type intraday set grid { OPEN HIGH LOW CLOSE {five {2+3}}} set options {row_count 1000 pack 1} set prototype [ti::create_prototype_m $type $grid $options] set grid [ti::create_grid $prototype MSFT] ti::reply_to_client $channel {} [dict create prototype [ti::debug_dump $prototype] grid [ti::debug_dump $grid]] set thread_count 20 set start_time [clock microseconds] for {set i 0} {$i < $thread_count} {incr i} { dispatch_tcl [list start_time $start_time i $i grid $grid channel $channel] { # remote set dispatch_time [expr {[clock microseconds] - [dict get $input start_time]}] ti::fill [dict get $input grid] preview set total_time [expr {[clock microseconds] - [dict get $input start_time]}] ti::reply_to_client [dict get $input channel] {} [dict create dispatch_time $dispatch_time total_time $total_time i [dict get $input i]] } { # return ti::reply_to_client [dict get $input channel] return:[dict get $input i] } } set dispatch_time [expr {[clock microseconds] - $start_time}] ti::reply_to_client $channel "Dispatcher finished after $dispatch_time microseconds" >>>> 53517805 >>>> >>>> 53517805 >>>> Dispatcher finished after 626 microseconds >>>> 53517805 >>>> 0: >>>> 53517805 >>>> >>>> 53517805 >>>> >>>> 53517805 >>>> return:1 >>>> 53517805 >>>> return:0 >>>> 53517805 >>>> >>>> 53517805 >>>> return:3 >>>> 53517805 >>>> >>>> 53517805 >>>> >>>> 53517805 >>>> return:7 >>>> 53517805 >>>> return:4 >>>> 53517805 >>>> >>>> 53517805 >>>> >>>> 53517805 >>>> return:5 >>>> 53517805 >>>> return:8 >>>> 53517805 >>>> >>>> 53517805 >>>> return:2 >>>> 53517805 >>>> >>>> 53517805 >>>> return:14 >>>> 53517805 >>>> >>>> 53517805 >>>> >>>> 53517805 >>>> return:13 >>>> 53517805 >>>> return:16 >>>> 53517805 >>>> >>>> 53517805 >>>> return:9 >>>> 53517805 >>>> >>>> 53517805 >>>> return:6 >>>> 53517805 >>>> >>>> 53517805 >>>> return:11 >>>> 53517805 >>>> >>>> 53517805 >>>> return:15 >>>> 53517805 >>>> >>>> 53517805 >>>> >>>> 53517805 >>>> >>>> 53517805 >>>> return:17 >>>> 53517805 >>>> return:18 >>>> 53517805 >>>> return:12 >>>> 53517805 >>>> >>>> 53517805 >>>> return:10 >>>> 53517805 >>>> >>>> 53517805 >>>> return:19 */ void GridInstance::releaseWriteLock() { assert(holdingWriteLock()); _values.commit(); pthread_mutex_lock(&_mutex); _writeLocked = false; assertFalse(pthread_cond_signal(&_writerCondition)); pthread_mutex_unlock(&_mutex); } bool GridInstance::holdingWriteLock() const { return _writeLocked && (_writerThread == pthread_self()); } int GridInstance::getPackedToPossible(int packed) const { WhichData whichData(this); return _values.packedToPossible(packed, whichData.use()); } int GridInstance::getPossibleToPacked(int packed, Round round) const { WhichData whichData(this); return _values.possibleToPacked(packed, whichData.use(), round); } int GridInstance::completedRowCountPossible() const { WhichData whichData(this); return _values.completedRowCount(whichData.use()); } int GridInstance::completedRowCountPacked() const { WhichData whichData(this); return whichData.grid().nextRow(); } /* Notes from a recent debug session. This is constantly out of date, but * it's a start. telnet localhost 8370 command=tcl_command&script=make_prototype v_proto {intraday {{price volume 6 high} {{current("close")} {current("volume")} 6 {current("high")}}} {row_count 20 pack 1 minutes_per_candle 60 first_candle_size 0}} command=tcl_command&script=v_proto create V_DELL command DELL command=tcl_command&script=V_DELL debug_dump command=tcl_command&script=V_DELL fill_to_complete command=tcl_command&script=llength [V_DELL debug_dump] command=tcl_command&script=join [lrange [V_DELL debug_dump] end-10 end] \n command=tcl_command&script=V_DELL get_times 5 command=tcl_command&script=clock format [lindex [V_DELL get_times 5] 0] command=tcl_command&script=V_DELL fill_to_row 6 command=tcl_command&script=V_DELL debug_dump command=tcl_command&script=join [lrange [ANW debug_dump] end-10 end] \n command=debug_tos&symbol=ANW&set=1&valid=1&price=99.99&time=1370352600&updates_last=1 telnet localhost 8369 command=login&username=philip&password=test+QQQ command=debug_tos&symbol=DELL&set=1&valid=1&price=1006&time=1370354340&updates_last=1&new_print=1 command=debug_tos&symbol=DELL&set=1&valid=1&price=1007&time=1370357940&updates_last=1&new_print=1 command=debug_tos&symbol=DELL&set=1&valid=1&price=1008&time=1370361540&updates_last=1&new_print=1 command=debug_tos&symbol=DELL&set=1&valid=1&price=1009&time=1370365140&updates_last=1&new_print=1 command=debug_tos&symbol=DELL&set=1&valid=1&price=1010&time=1370368740&updates_last=1&new_print=1 command=debug_tos&symbol=DELL&set=1&valid=1&price=1011&time=1370372340&updates_last=1&new_print=1 break GridInstance::removeOldData break OneMinuteCandles.C:91 break GridReaderBase.C:511 break GridInstance::deleteStartingAt break ExecutionContext::gridCmd */ /* Notes from a newer debug session. This was pointing at the flex_command * server, not the tiq server. This helped me track down the bug where we * sometimes would update correctly, but other times values would get stuck, * and would not be updated when new prints came in. 8/15/2013 telnet localhost 9370 command=tcl_command&script=make_prototype v_proto {intraday {{price volume 6 high} {{current("close")} {current("volume")} 6 {current("high")}}} {row_count 20 pack 1 minutes_per_candle 60 first_candle_size 0}} command=tcl_command&script=v_proto create V_DELL command DELL command=tcl_command&script=v_proto create V_DELL.1 command DELL command=tcl_command&script=V_DELL debug_dump command=tcl_command&script=dict get [V_DELL debug_dump] _epoch #if you do this at the end of the day it will tell you when the day started command=tcl_command&script=set times [v_proto get_times 13]; list $times [clock format [lindex $times 0]] [clock format [lindex $times 1]] command=tcl_command&script=set result {}; for {set row 13} {$row < 20} {incr row} {set times [v_proto get_times $row]; lappend result [list $times [clock format [lindex $times 0]] [clock format [lindex $times 1]]]}; set result # This will fill in all data from before today command=tcl_command&script=V_DELL fill_to_row 13 command=tcl_command&script=V_DELL.1 fill_to_row 13 #send 100 shares for 6:30am this morning command=tcl_command&script=dict get [V_DELL debug_dump] _epoch command=tcl_command&script=V_DELL fill_to_complete command=tcl_command&script=dict get [V_DELL debug_dump] _epoch #view V_DELL in client. Expect to see one new candle 100 shares #send 100 shares for 7:00am this morning, then 100 shares for 6:30am, then 100 shares for 7:00am command=tcl_command&script=dict get [V_DELL debug_dump] _epoch command=tcl_command&script=dict get [V_DELL.1 debug_dump] _epoch command=tcl_command&script=V_DELL fill_to_complete command=tcl_command&script=V_DELL.1 fill_to_complete # View both in client. V_DELL.1 is expected to have the correct answer, two new candles with 200 shares each. # V_DELL is expected to be frozen, the same as the first time we looked. Which of course is incorrect. command=tcl_command&script=dict get [V_DELL debug_dump] _epoch command=tcl_command&script=dict get [V_DELL.1 debug_dump] _epoch command=tcl_command&script=set times {};for {set i 0} {$i < 20} {incr i} {set t [lindex [v_proto get_times $i] 0]; lappend times [list $i $t [clock format $t]]};set times ./tikiller-live -i database_candles=its-the-cops -i init_script=init.tcl -i database_RW=roberto -i debug_database_cutoff=1376486999 >&output.txt