#include #include "../shared/CommandDispatcher.h" #include "../shared/ReplyToClient.h" #include "../shared/ThreadMonitor.h" #include "../shared/LogFile.h" #include "../shared/ContainerThread.h" #include "BinaryLogReader.h" #include "TestFromBinaryLog.h" class TestFromBinaryLog : public IContainerThreadUser, private RequestListener { private: const IContainerThreadUser::Ref _keepMeAlive; FeedMaster &_feedMaster; std::ifstream _logFile; TimeVal::Microseconds _timeOffsetFrom; int _rate; // Records / second. int _recordsSent; bool _behind; // We are not pumping data out fast enough. bool paused() const { return (_rate == 0) || !_logFile; } int quota() const { if (paused()) return 0; else return (TimeVal(true).asMicroseconds() - _timeOffsetFrom) * _rate / 1000000L; } // If we are far enough behind, we'll take a break after this many records // to listen for events. static const int MAX_RECORDS = 1000; enum { mtOpenFile, mtSetRate }; virtual void handleRequestInThread(Request *original) { switch (original->callbackId) { case mtOpenFile: { ExternalRequest *externalRequest = dynamic_cast< ExternalRequest * >(original); const std::string fileName = externalRequest->getProperty("file_name"); const bool wasPaused = paused(); if (wasPaused) { // Reset the counters. Otherwise we'll try to catch up for all the // time after we hit EOF in the old file. _timeOffsetFrom = TimeVal(true).asMicroseconds(); _recordsSent = 0; _behind = false; } TclList msg; msg<getSocketInfo()); if (fileName == "<") { _logFile.clear(); if (!_logFile) addToOutputQueue(externalRequest->getSocketInfo(), "Unable to rewind file.", externalRequest->getResponseMessageId()); else _logFile.seekg(0, std::ios_base::beg); } else { _logFile.close(); _logFile.clear(); _logFile.open(fileName.c_str(), std::ifstream::binary); addToOutputQueue(externalRequest->getSocketInfo(), fileName + " " + (_logFile?"success":"FAIL"), externalRequest->getResponseMessageId()); } break; } case mtSetRate: { ExternalRequest *externalRequest = dynamic_cast< ExternalRequest * >(original); _timeOffsetFrom = TimeVal(true).asMicroseconds(); _rate = strtolDefault(externalRequest->getProperty("rate"), 0); _recordsSent = 0; _behind = false; addToOutputQueue(externalRequest->getSocketInfo(), "rate=" + ntoa(_rate) + " records/second", externalRequest->getResponseMessageId()); TclList msg; msg<getSocketInfo()); break; } } } virtual void newRequest(Request *request) { returnToMe(_keepMeAlive, request); } virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { if (paused()) return; else if (_behind) callbacks.wakeAfterMs(0); else callbacks.wakeAfterMs(100); } virtual void awake(std::set< int > const &woken) { if (paused()) return; ThreadMonitor &tm = ThreadMonitor::find(); int count = quota() - _recordsSent; if (count > MAX_RECORDS) { _behind = true; count = MAX_RECORDS; tm.increment("binary log behind"); } else _behind = false; for (; count > 0; count--) { Record::Ref record; _logFile>>record; if (record) { tm.increment("read from log"); _feedMaster.sendRecord(record->getEncoded()); _recordsSent++; } if (_logFile.bad()) { TclList msg; msg<listenForCommand("blt_open_file", this, mtOpenFile); cd->listenForCommand("blt_set_rate", this, mtSetRate); getContainer()->addUser(_keepMeAlive); } }; void initTestFromBinaryLog(FeedMaster &feedMaster) { new TestFromBinaryLog(feedMaster); }