#include #include #include #include "HistoryFileWriter.h" #include "../BinaryLogReader.h" #include "../FieldLists.h" /* Convert a binary log file into a history file. * * We constantly spit out the binary log. Typically that's done on the * fast_alert_search master server. We could have gotten the same data from * the database, but this should be faster and more efficient. * * The output of this is a history file. Similar format, but this includes * an index. The history server will mmap() to the history file and use it * to generate results. */ /* This example shows what happens when you run this program on a typical * data file. Note that the second run was much faster because the file was * in the cache. [phil@joey-mousepad history_server]$ ./log_to_history history/alerts_binlog.1548416647 __.history Copied 7,972,517 records in 208,013,322μs. Wrote 30,000 records in 101,520μs. Wrote index in 15,707,654μs. [phil@joey-mousepad history_server]$ rm __.history [phil@joey-mousepad history_server]$ ./log_to_history history/alerts_binlog.1548416647 __.history Copied 7,972,517 records in 67,747,727μs. Wrote 30,000 records in 92,817μs. Wrote index in 15,837,768μs. [phil@joey-mousepad history_server]$ ls -l history/alerts_binlog.1548416647 __.history -rw-r--r-- 1 phil users 7978165948 Feb 2 09:58 __.history -rw-r--r-- 1 phil users 7977368622 Jan 28 22:43 history/alerts_binlog.1548416647 [phil@joey-mousepad history_server]$ ./describe_history __.history dump(): getStartTime(): 1548416521 {2019-01-25 03:42:01} {Fri Jan 25 03:42:01 2019} id 22735630088 symbol IBM timestamp {1548416521 {2019-01-25 03:42:01} {Fri Jan 25 03:42:01 2019}} id 22735630089 symbol IBM timestamp {1548416521 {2019-01-25 03:42:01} {Fri Jan 25 03:42:01 2019}} id 22735630090 symbol SVXY timestamp {1548416522 {2019-01-25 03:42:02} {Fri Jan 25 03:42:02 2019}} ... id 22743602602 symbol AE timestamp {1548464400 {2019-01-25 17:00:00} {Fri Jan 25 17:00:00 2019}} id 22743602603 symbol ERIC timestamp {1548464400 {2019-01-25 17:00:00} {Fri Jan 25 17:00:00 2019}} id 22743602604 symbol FPE timestamp {1548464400 {2019-01-25 17:00:00} {Fri Jan 25 17:00:00 2019}} [phil@joey-mousepad history_server]$ */ // This is how our history is sorted. First by time, then by id. Any external // use of the file will be based on time. Id's are used at a key to say where // one request ends and the next should pick up. For streaming alerts we only // have the id. But for any form of history the focus is on time. We can // use a time combined with an id to restart a history request where we left // off. struct Key { time_t time; int64_t id; bool operator ==(Key const &other) const { return (time == other.time) && (id == other.id); } bool operator <(Key const &other) const { return (time < other.time) || ((time == other.time) && (id < other.id)); } Key(time_t time, int64_t id) : time(time), id(id) { } Key() { } Key(Record::Ref const &record) { bool success; record->lookUpValue(MainFields::id).getInt(success, id); if (!success) { std::cerr<<"Row with missing id."<lookUpValue(MainFields::timestamp).getInt(success, time); if (!success) { std::cerr<<"Row with missing timestamp."< key; int64_t position; RecordInfo(int64_t timestamp, int64_t id, int64_t position) : key(timestamp, id), position(position) { } RecordInfo(RecordInfo &other) : key(other.key), position(other.position) { } struct Compare { bool operator ()(RecordInfo const &a, RecordInfo const &b) const { return a.key < b.key; } }; }; */ int main(int argc, char *argv[]) { if (argc != 3) { std::cerr<<"Invalid arguments."< buffer; int recordCount = 0; // We want to keep MAX_BUFFER_SIZE in memory most of the time. This allows // us to reorder the data. The assumption is that the log file is close to // orderedq, but not quite. By default HistoryFileWriter silently ignores // and skips any records that are out of order. This would throw away a // little over 0.1% of the alerts in a typical file, if we didn't have this // buffer. // // The first file I looked at needed a buffer of about 1,400 records. I // arbitrarily set this limit to 15,000. That was not enough for another // file, so I doubled it. The problem with 15,000 records happened at // exactly 8:30 Pacific time. // // Initially we read the records and put them right into the buffer. The // buffer is a std::map, so it stays sorted and the insert order doesn't // matter. // // Once the buffer is full the strategy changes. Each time we get a new // record, first we make sure that it doesn't come before all of the records // currently in the buffer. (We really want to know if it comes before any // of the records we've written, but this is approximately the same thing.) // If the new record comes before the first item in our buffer, we // immediately report an error and exit the program. // // Barring any errors, the new item is added to the buffer. Then we remove // an item from the buffer to return the buffer's size to MAX_BUFFER_SIZE. // We always grab the record with the smallest Key value. We write that // record to the HistoryFileWriter. // // After reading all records from the file we write the remaining records // from the buffer to the HistoryFileWriter in order. const size_t MAX_BUFFER_SIZE = 30000; while ((*in) && !out.inErrorState()) { Record::Ref record; (*in)>>record; if (record) { recordCount++; Key key(record); if ((!buffer.empty()) && (key < buffer.begin()->first) && (buffer.size() == MAX_BUFFER_SIZE)) { std::cerr<<"Too far out of order."<first<first< MAX_BUFFER_SIZE) { out.addRecord(buffer.begin()->second); buffer.erase(buffer.begin()); } } } std::cout<<"Copied "<bad()) { perror("reading file"); return 1; } for (auto &kvp : buffer) out.addRecord(kvp.second); std::cout<<"Wrote "<