#include #include "../misc_framework/ImportData.h" #include "VolumeWeightedData.h" //////////////////////////////////////////////////////////////////// // Global //////////////////////////////////////////////////////////////////// std::string volumeBlockToString(VolumeBlock const &volumeBlock) { // This is not the same format as the Delphi format uses. It was storing // time in the standarrd Delphi format. std::string result = ntoa(volumeBlock.high); result += ':'; result += ntoa(volumeBlock.low); result += ':'; result += ntoa(volumeBlock.startTime); result += ':'; result += ntoa(volumeBlock.endTime); return result; } VolumeBlock volumeBlockFromString(std::string s) { const std::vector< std::string > pieces = explode(":", s); VolumeBlock result; switch (pieces.size()) { case 4: // This is the newer format, created by our C++ servers. nfroma(result.high, pieces[0]); nfroma(result.low, pieces[1]); result.startTime = importTime(pieces[2]); result.endTime = importTime(pieces[3]); break; case 7: // This is the older format, created by our Delphi severs. The extra data // was available for some development tools, but not used in any // production work. // highest = pieces[0] nfroma(result.high, pieces[1]); // median = pieces[2] nfroma(result.low, pieces[3]); // lowest = pieces[4] result.startTime = importTime(pieces[5]); result.endTime = importTime(pieces[6]); break; default: throw 0; } if (!(result.startTime && result.endTime)) throw 0; return result; } std::string volumeBlocksToString(VolumeBlocks volumeBlocks, bool reverse) { std::string result; if (reverse) { for (VolumeBlocks::const_reverse_iterator it = volumeBlocks.rbegin(); it != volumeBlocks.rend(); it++) { if (it != volumeBlocks.rbegin()) result += ';'; result += volumeBlockToString(*it); } } else { for (VolumeBlocks::const_iterator it = volumeBlocks.begin(); it != volumeBlocks.end(); it++) { if (it != volumeBlocks.begin()) result += ';'; result += volumeBlockToString(*it); } } return result; } void volumeBlocksFromString(std::string const &s, VolumeBlocks &volumeBlocks) { volumeBlocks.clear(); if (s.empty()) return; const std::vector< std::string > pieces = explode(";", s); volumeBlocks.reserve(pieces.size()); for (std::vector< std::string >::const_iterator it = pieces.begin(); it != pieces.end(); it++) volumeBlocks.push_back(volumeBlockFromString(*it)); return; } //////////////////////////////////////////////////////////////////// // VolumeBlockFactory //////////////////////////////////////////////////////////////////// void VolumeBlockFactory::addToBlock(double price, DataNode::Integer size, time_t time) { _inProcessStart = std::min(_inProcessStart, time); _inProcessEnd = std::max(_inProcessEnd, time); _inProcessList[price] += size; _inProcessShares += size; } void VolumeBlockFactory::initBlock() { _inProcessList.clear(); _inProcessShares = 0; _inProcessStart = std::numeric_limits< time_t >::max(); _inProcessEnd = std::numeric_limits< time_t >::min(); } void VolumeBlockFactory::nextBlock() { VolumeBlock newBlock; newBlock.startTime = _inProcessStart; newBlock.endTime = _inProcessEnd; DataNode::Integer firstCutOff = _groupBy / 4; DataNode::Integer thirdCutOff = _groupBy * 3 / 4; DataNode::Integer sharesSoFar = 0; std::map< double, DataNode::Integer >::const_iterator it = _inProcessList.begin(); while (true) { assert(it != _inProcessList.end()); sharesSoFar += it->second; if (sharesSoFar >= firstCutOff) { newBlock.low = it->first; firstCutOff = std::numeric_limits< DataNode::Integer>::max(); } if (sharesSoFar >= thirdCutOff) { newBlock.high = it->first; break; } it++; } _volumeBlocks.push_back(newBlock); initBlock(); } void VolumeBlockFactory::signalOverflow() { // We should report this to the log. // "Cutting off volume block list due to overflow." // This shouldn't happen too often, I hope! _overflow = true; _volumeBlocks.clear(); initBlock(); } VolumeBlockFactory::VolumeBlockFactory(DataNode::Integer groupBy) : _overflow(false), _groupBy(groupBy) { initBlock(); } void VolumeBlockFactory::initializeWith(VolumeBlocks const &volumeBlocks) { _volumeBlocks = volumeBlocks; initBlock(); } void VolumeBlockFactory::initializeWith(std::string const &encoded) { try { volumeBlocksFromString(encoded, _volumeBlocks); } catch (...) { // It might be nice to log this problem. _volumeBlocks.clear(); } initBlock(); } void VolumeBlockFactory::addPrint(double price, DataNode::Integer size, time_t time) { while (!_overflow) { const DataNode::Integer sizeOfPiece = std::min(size, _groupBy - _inProcessShares); if (sizeOfPiece <= 0) break; addToBlock(price, sizeOfPiece, time); size -= sizeOfPiece; assert(_inProcessShares <= _groupBy); if (_inProcessShares == _groupBy) { if (getBlockCount() + 1 >= MAX_VOLUME_BLOCKS) signalOverflow(); else nextBlock(); } } }