#include #include #include #include #include #include #include #include //#include "../shared/SimpleLogFile.h" #include "../FieldLists.h" #include "HistoryFileWriter.h" // See test/history_file_writer/ and test/history_file_reader/ for the test // files for this unit. // How many bytes would it take to store a pointer into this data? In the file // we always store the pointers using the fewest possible bytes. int getBytesPerPointer(int64_t endOfData) { return (int)std::ceil(std::log2(endOfData)/8); } // This will try to read a record in the format used by HistoryFileWriter. // We assume that source is pointing to the beginning of the record. On // success, success will be set to true, record will be set to the record // we just read, and source will point to the byte immediately after this // record. On failure, success will be set to false and the state of record // and source are undefined. We do no explicitly manipulate errno. You can // treat this call just like most operations on streams. If you set errno to // 0 before this call, and there's an error, then errno is probably set to // something interesting. (I.e. we preserve the poorly defined but somewhat // useful semantics of the C++ stream libraries on which this function is // built.) static void readRecord(std::istream &source, bool &success, Record::Ref &record) { success = false; if (source.get() != 'P') return; if (source.get() != 'S') return; int recordSize = 0; char *p = (char *)&recordSize; p[0] = source.get(); p[1] = source.get(); p[2] = source.get(); if (!source) return; std::string raw(recordSize, (char)0); source.read(&raw[0], recordSize); if (!source) return; int copyOfSize = 0; p = (char *)©OfSize; p[0] = source.get(); p[1] = source.get(); p[2] = source.get(); if (!source) return; if (copyOfSize != recordSize) return; record = Record::create(raw); success = record; return; } struct HistoryFileMetaData { int64_t version; int64_t minId; time_t minTime; int idByteCount; int timeByteCount; int fileByteCount; // This is not used! TODO get rid of it. int recordCount; int indexCount; static const int64_t CURRENT_VERSION = 1; }; ///////////////////////////////////////////////////////////////////// // HistoryFileWriter ///////////////////////////////////////////////////////////////////// void HistoryFileWriter::reportError(std::string const &message, bool useErrNo) { if (inErrorState()) return; _state = s_error; _errorMessage = message; if (useErrNo && errno) { _errorMessage += " ("; _errorMessage += errorString(); _errorMessage += ')'; } } HistoryFileWriter::HistoryFileWriter(std::string const &fileName) : _recordCount(0), _minId(std::numeric_limits< int64_t >::max()), _maxId(std::numeric_limits< int64_t >::min()), _minTime(std::numeric_limits< time_t >::max()), _maxTime(std::numeric_limits< time_t >::min()), _state(s_writingRecords), _lastId(0), _lastTime(0) { // Create a new file. Delete any old ones. errno = 0; _output.open(fileName.c_str(), std::ios::out | std::ios::in | std::ios::binary | std::ios::trunc); if (!_output) { reportError("Unable to open file " + fileName, true); return; } // Reserve space for a pointer to the header / index. Initially we fill the // file with records. These might be streaming, so we have no idea how big // the file will be before it's done. We could never reserve space for the // index. Instead, we put the index at the end of the file. The file starts // with an 8 byte integer pointing to the header / index. Initially that's // 0 saying that we haven't filled in that data yet. errno = 0; for (int i = 0; i < 8; i++) _output.put(0); if (!_output) { reportError("Error writing initial header to file " + fileName, true); return; } } void HistoryFileWriter::addRecord(Record::Ref const &record) { if (_state != s_writingRecords) { reportError("Invalid state."); return; } bool success; int64_t id; record->lookUpValue(MainFields::id).getInt(success, id); if (!success) // Some type of warning? Generally any invalid record being added here // will be silently ignored. return; time_t time; record->lookUpValue(MainFields::timestamp).getInt(success, time); if (!success) return; if ((time < _lastTime) || ((time == _lastTime) && (id <= _lastId))) { // Out of order. /* std::cerr<<"Out of order, skipping. id="<>8) <<(char)(encodedLength>>16); _output.write(encodedStart, encodedLength); _output<<(char)encodedLength <<(char)(encodedLength>>8) <<(char)(encodedLength>>16); if (!_output) { reportError("Error writing record #" + ntoa(_recordCount) + " to file ", true); return; } } void HistoryFileWriter::buildIndex() { if (_state != s_writingRecords) { reportError("Invalid state."); return; } // The documentation on tellp() is a bit vague and/or confusing. It's not // clear if tellp() works when the file size is bigger than an int. I did // some tests, and this works as it should! :) const int64_t indexStart = _output.tellp(); HistoryFileMetaData metadata; metadata.version = HistoryFileMetaData::CURRENT_VERSION; metadata.recordCount = _recordCount; if (_recordCount == 0) { // We'd like this to work, just in case. Maybe it's a holiday. metadata.minId = 0; metadata.minTime = 0; metadata.indexCount = 0; } else { metadata.minId = _minId; metadata.minTime = _minTime; metadata.indexCount = (_recordCount + 198) / 100; } if (_minId >= _maxId) // The default formulas might make these 0 or NaN. metadata.idByteCount = 1; else metadata.idByteCount = (int)std::ceil(std::log2(_maxId - _minId)/8); if (_minTime >= _maxTime) metadata.timeByteCount = 1; else metadata.timeByteCount = (int)std::ceil(std::log2(_maxTime - _minTime)/8); const int64_t bytesPerKey = metadata.idByteCount + metadata.timeByteCount; if (bytesPerKey > 8) { reportError("Range too big. Min id = " + ntoa(_minId) + ", Max id = " + ntoa(_maxId) + ", Min time = " + ctimeString(_minTime) + " (" + ntoa(_minTime) + "), Max time = " + ctimeString(_maxTime) + " (" + ntoa(_maxTime) + "), Id byte count = " + ntoa(metadata.idByteCount) + ", Time byte count = " + ntoa(metadata.timeByteCount)); return; } _output.write((char *)&metadata, sizeof(metadata)); const int64_t bytesPerPointer = getBytesPerPointer(indexStart); const int64_t keysStart = _output.tellp(); const int64_t pointersStart = keysStart + metadata.indexCount * bytesPerKey; const int keyShift = metadata.idByteCount * 8; _output.seekg(8); int keyNumber = 0; for (int recordNumber = 0; recordNumber < _recordCount; recordNumber++) { const int64_t recordPosition = _output.tellg(); bool success; Record::Ref record; errno = 0; readRecord(_output, success, record); if (!success) { // This seems unlikely, outside of development. Something went wrong // with our logic inside this module. Or someone was messing with the // file between when we wrote it and when we read it back in. reportError("Unable to read record while building index", true); return; } if (((recordNumber % 100) == 0) || (recordNumber == _recordCount - 1)) { // The reader is very flexible. It only knows that the first record and // the last record are in the index. (That's why we explicitly store the // number of index entries, rather than let the reader recompute it.) We // record every 100th entry, starting with the first. We also record the // last one. const int64_t nextRecord = _output.tellg(); bool success; int64_t id; record->lookUpValue(MainFields::id).getInt(success, id); if (!success) { // This seems like an internal error, unless someone's been changing // the file as we write it. This should have been caught as we were // writing the records. reportError("Invalid record, no id"); return; } if ((id < _minId) || (id > _maxId)) { // This seems like an internal error, unless someone's been changing // the file as we write it. reportError("Id out of range: id=" + ntoa(id) + ", min=" + ntoa(_minId) + ", max=" + ntoa(_maxId)); return; } time_t time; record->lookUpValue(MainFields::timestamp).getInt(success, time); if (!success) { // This seems like an internal error, unless someone's been changing // the file as we write it. This should have been caught as we were // writing the records. reportError("Invalid record, no timestamp"); return; } if ((time < _minTime) || (time > _maxTime)) { // This seems like an internal error, unless someone's been changing // the file as we write it. reportError("Time out of range: time=" + ctimeString(time) + ", min=" + ctimeString(_minTime) + ", max=" + ctimeString(_maxTime)); return; } const int64_t idOffset = id - _minId; const int64_t timeOffset = time - _minTime; const int64_t key = (timeOffset<(&key), bytesPerKey); _output.seekp(pointersStart + keyNumber * bytesPerPointer); _output.write(reinterpret_cast< char const * >(&recordPosition), bytesPerPointer); _output.seekg(nextRecord); if (!_output) { reportError("Error writing index", true); return; } keyNumber++; } } if (metadata.indexCount != keyNumber) { reportError("Internal error, expected key count = " + ntoa(metadata.indexCount) + ", actual key count = " + ntoa(keyNumber)); return; } errno = 0; _output.flush(); _output.seekp(0); _output.write(reinterpret_cast< char const * >(&indexStart), 8); _output.close(); if (!_output) { reportError("Unable to finish writing index", true); return; } _state = s_closed; } ///////////////////////////////////////////////////////////////////// // HistoryFileReader ///////////////////////////////////////////////////////////////////// void HistoryFileReader::dump(XmlNode &parent) const { XmlNode &node = parent[-1]; node.name = "HistoryFileReader"; node.properties["_memoryMapSize"] = ntoa(_memoryMapSize); node.properties["_firstKey"] = ntoa(_firstKey); node.properties["_firstPointer"] = ntoa(_firstPointer); node.properties["_bytesPerKey"] = ntoa(_bytesPerKey); node.properties["_bytesPerPointer"] = ntoa(_bytesPerPointer); node.properties["_minId"] = ntoa(_minId); node.properties["_minTime"] = ntoa(_minTime); node.properties["_minTimeHuman"] = ctimeString(_minTime); node.properties["_dataEnd"] = ntoa(_dataEnd); node.properties["_maxIdCount"] = ntoa(_maxIdCount); node.properties["_maxTimeCount"] = ntoa(_maxTimeCount); node.properties["_indexCount"] = ntoa(_indexCount); node.properties["_idBits"] = ntoa(_idBits); node.properties["_fileHandle"] = ntoa(_fileHandle); node.properties["_errorMessage"] = _errorMessage; } bool HistoryFileReader::read(void *dest, int64_t size, int64_t offset) const { errno = 0; if (_base == MAP_FAILED) { // Use pread to bootstrap the process. int64_t result = pread(_fileHandle, dest, size, offset); return result == size; } else { // The memory map is faster. No penalty for crossing the kernal boundary / // making a system call. if (size + offset > _dataEnd) return false; memcpy(dest, _base + offset, size); return true; } } void HistoryFileReader::recordError(std::string const &message, bool useErrno) { if (!_errorMessage.empty()) // Always keep the first error message. Errors can cascade. We want the // original cause. return; _errorMessage = message; if (useErrno && errno) { _errorMessage += " ("; _errorMessage += errorString(); _errorMessage += ')'; } cleanUp(); } void HistoryFileReader::cleanUp() { if (_fileHandle >= 0) { close(_fileHandle); _fileHandle = -1; } if (_base != MAP_FAILED) { munmap(_base, _memoryMapSize); _base = (char *)MAP_FAILED; } } HistoryFileReader::HistoryFileReader(std::string const &fileName) : _base((char *)MAP_FAILED), _fileHandle(-1) { _fileHandle = open(fileName.c_str(), O_RDONLY); if (_fileHandle < 0) { recordError("Unable to open " + fileName, true); return; } if (!read(_dataEnd, 0)) { recordError("Unable to read header location", true); return; } if (!_dataEnd) { // The writer explicitly writes a 0 here when it first starts, and fills // in the real value at the very end. recordError("File is incomplete"); return; } HistoryFileMetaData metadata; if (!read(metadata, _dataEnd)) { recordError("Unable to read metadata " + fileName, true); return; } if (metadata.version != HistoryFileMetaData::CURRENT_VERSION) { recordError("Invalid version: Expecting " + ntoa(HistoryFileMetaData::CURRENT_VERSION) + ", found " + ntoa(metadata.version)); return; } _minId = metadata.minId; _minTime = metadata.minTime; _indexCount = metadata.indexCount; _bytesPerKey = metadata.idByteCount + metadata.timeByteCount; _bytesPerPointer = getBytesPerPointer(_dataEnd); _idBits = metadata.idByteCount * 8; _maxIdCount = (((int64_t)1) << _idBits) - 1; _maxTimeCount = (((int64_t)1) << (metadata.timeByteCount*8)) - 1; //const int64_t pageSize = sysconf(_SC_PAGE_SIZE); _firstKey = _dataEnd + sizeof(HistoryFileMetaData); const int64_t startOfMap = 0;// (_firstKey / pageSize) * pageSize; _firstKey -= startOfMap; //assert((_firstKey >= 0) && (_firstKey < pageSize)); _firstPointer = _firstKey + metadata.indexCount * _bytesPerKey; _memoryMapSize = _firstPointer + metadata.indexCount * _bytesPerPointer; if (_indexCount) { // It is an error to call mmap with a size of 0. Don't try. errno = 0; _base = (char *)mmap(NULL, _memoryMapSize, PROT_READ, MAP_SHARED, _fileHandle, startOfMap); if (_base == MAP_FAILED) { recordError("Unable to map memory", true); return; } } } HistoryFileReader::~HistoryFileReader() { cleanUp(); } void HistoryFileReader::find(time_t time, int64_t id, int64_t &lowerBound, int64_t &upperBound) const { if (!isGood()) { lowerBound = INVALID; upperBound = INVALID; return; } time -= _minTime; time = std::max(0L, std::min(_maxTimeCount, time)); id -= _minId; id = std::max(0L, std::min(_maxIdCount, id)); const int64_t key = (time << _idBits) | id; // Standard STL rules. lowerIndex is the lowest possible value. upperIndex // is one above the greatest possible value. int lowerIndex = 0; int upperIndex = _indexCount; while (true) { if (upperIndex <= lowerIndex) // Not found. Return the nearest items. break; const int middleIndex = (upperIndex + lowerIndex) / 2; int64_t middleKey = 0; memcpy(&middleKey, _base + _firstKey + _bytesPerKey * middleIndex, _bytesPerKey); if (middleKey == key) { // Found it! Set the range to include exactly this one entry. lowerIndex = middleKey; upperIndex = middleIndex + 1; break; } if (middleKey < key) // Go forward in the index. lowerIndex = middleIndex + 1; else // Go backward in the index. upperIndex = middleIndex; } // Note that lowerIndex and upperIndex have swapped positions. upperIndex // is now right before the item we want and lower index is right after it. // Or they are the same, so it doesn't matter if we swap them or not. We // could not have exited the loop if that wasn't true. lowerBound = indexToFilePosition(upperIndex - 1); upperBound = indexToFilePosition(lowerIndex); } int64_t HistoryFileReader::indexToFilePosition(int index) const { if ((index < 0) || (index >= _indexCount)) return INVALID; int64_t result = 0; memcpy(&result, _base + _firstPointer + _bytesPerPointer * index, _bytesPerPointer); return result; } int64_t HistoryFileReader::findFirst(time_t time) const { int64_t lowerBound, upperBound; find(time, _minId, lowerBound, upperBound); return upperBound; } int64_t HistoryFileReader::findLast(time_t time) const { int64_t lowerBound, upperBound; find(time, _minId + (((int64_t)1)<<_idBits) - 1, lowerBound, upperBound); return lowerBound; } void HistoryFileReader::readForward(int64_t &position, Record::Ref &record) const { readAndMove(position, record, true); } void HistoryFileReader::readBackward(int64_t &position, Record::Ref &record) const { readAndMove(position, record, false); } void HistoryFileReader::readAndMove(int64_t &position, Record::Ref &record, bool forward) const { const auto readPtr = [this](int64_t size, int64_t offset) -> char const * { if (size + offset > _dataEnd) return NULL; return _base + offset; }; record = NULL; if ((position < 8) || (position >= _dataEnd)) { position = INVALID; return; } unsigned char const *header = (unsigned char const *)readPtr(5, position); if (!header) { position = INVALID; return; } if ((header[0] != 'P') || (header[1] != 'S')) { position = INVALID; return; } const int size = header[2] | (((int)header[3])<<8) | (((int)header[4])<<16); char const *raw = readPtr(size, position + 5); if (!raw) { position = INVALID; return; } record = Record::createShare(raw, size); if (!record) { position = INVALID; return; } if (forward) { position += 8 + size; if (position >= _dataEnd) // EOF. We successfully read the record, but there is no next record. position = INVALID; } else { // Going backwards. if (position <= 8) { // EOF. We successfully read the record, but there is no next record position = INVALID; return; } int previousRecordSize = 0; if (!read(&previousRecordSize, 3, position-3)) { // I/O error position = INVALID; return; } position -= previousRecordSize + 8; if (position <= 8) { // Some type of file corruption. It seemed like there was at least one // more record before the one we just read. But when we went backwards, // we went too far back. position = INVALID; return; } } } void HistoryFileReader::move(int64_t &position, bool forward) const { Record::Ref record; readAndMove(position, record, forward); } ///////////////////////////////////////////////////////////////////// // HistoryFileIterator ///////////////////////////////////////////////////////////////////// HistoryFileIterator::CompareResult HistoryFileIterator::compare(time_t time, Record::Ref const &record) const { bool valid; time_t recordTime; record->lookUpValue(MainFields::timestamp).getInt(valid, recordTime); if (!valid) return Equal; if (time == recordTime) return Equal; if (recordTime > time) return _forward?RecordComesAfter:RecordComesBefore; else return _forward?RecordComesBefore:RecordComesAfter; } HistoryFileIterator::HistoryFileIterator(HistoryFileReader &reader, bool forward, time_t initialTime) : _reader(reader), _forward(forward), _readFromBuffer(true) { _nextLocation = _reader.findInitial(initialTime, forward); int64_t nextToRead = _nextLocation; while (true) { Record::Ref record; _reader.readAndMove(nextToRead, record, !_forward); if (!record) break; CompareResult compareResult = compare(initialTime, record); if (compareResult == RecordComesBefore) break; _buffered.push(record); } } Record::Ref HistoryFileIterator::getNext() { if (_readFromBuffer) { if (!_buffered.empty()) { Record::Ref result = _buffered.top(); _buffered.pop(); return result; } _readFromBuffer = false; // _nextLocation points to the first value that we pushed into the stack, // the last value we popped. We need to move the pointer one record // past that. _reader.move(_nextLocation, _forward); } Record::Ref result; _reader.readAndMove(_nextLocation, result, _forward); return result; }