#include #include #include "../shared/GlobalConfigFile.h" #include "../shared/ThreadMonitor.h" #include "../shared/SimpleLogFile.h" #include "../shared/ContainerThread.h" #include "RecordDispatcher.h" #include "BinaryLog.h" ///////////////////////////////////////////////////////////////////// // BinaryLogWriter ///////////////////////////////////////////////////////////////////// class BinaryLogWriter : public IContainerThreadUser, private RequestListener { private: enum { mtLogThis }; // This object will never go away. const Ref _keepMeAlive; const std::string _fileNamePrefix; const std::string _baseName; const std::string _fileNameSuffix; std::ofstream _output; time_t _nextLogTime; TimeVal::Microseconds _nextFlushTime; virtual void newRequest(Request *request) { returnToMe(_keepMeAlive, request); } void flushNow() { if (_output.is_open()) { _output.flush(); _nextFlushTime = 0; ThreadMonitor::find().increment("BinaryLog::flushNow()"); } } void updateNextLogTime() { // Start from the current time. time_t unixTime = time(NULL); struct tm parts; localtime_r(&unixTime, &parts); // Set time to midnight. parts.tm_sec = 0; parts.tm_min = 0; parts.tm_hour = 0; // Set date to tomorrow. The man page says this is explicitly legal. // mktime() will automatically convert jan 32 to feb 1, etc. parts.tm_mday++; // Do not specify day light savings time. Let mktime() figure that out on // its own. parts.tm_isdst = -1; // Switch the log files at midnight tomorrow morning. _nextLogTime = mktime(&parts); } void ensureOpen() { const time_t now = time(NULL); if (now >= _nextLogTime) { if (_output.is_open()) _output.close(); updateNextLogTime(); } if (!_output.is_open()) { std::string fileName = _fileNamePrefix + _baseName + itoa(now); if (!_fileNameSuffix.empty()) { fileName += "." + _fileNameSuffix; } _output.open(fileName.c_str(), std::ios::out | std::ios::binary | std::ios::app); } } void logThis(Record::Ref const &record) { ensureOpen(); char const *const encodedStart = record->getEncodedStart(); const size_t encodedLength = record->getEncodedLength(); // Start with a short string to help us find a new record, in case of // file currpution. Followed by a 3 byte record size. Most siginificant // byte first. The header is not included in the record size. _output<<"HERE:" <<(char)(encodedLength>>16) <<(char)(encodedLength>>8) <<(char)encodedLength; _output.write(encodedStart, encodedLength); // If we don't write anything else for a whole second, explicitly flush. // (As long as we keep writing stuff, don't bother flushing.) _nextFlushTime = TimeVal(true).asMicroseconds() + 1000000L; } protected: public: virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { if (_nextFlushTime > 0) // Round up to the next millisecond. We don't need to be that precise. // If we don't round up, sometimes the system will wake us up a little // early and we will go back to sleep several times. callbacks.wakeAfterMs((_nextFlushTime - TimeVal(true).asMicroseconds()) / 1000 + 1); } virtual void awake(std::set< int > const &woken) { if (TimeVal(true).asMicroseconds() >= _nextFlushTime) flushNow(); } virtual void handleRequestInThread(Request *original) { switch (original->callbackId) { case mtLogThis: NewRecord *newRecord = dynamic_cast< NewRecord * >(original); logThis(newRecord->record); break; } } BinaryLogWriter(IRecordDispatcher *source) : IContainerThreadUser(IContainerThread::primary()), _keepMeAlive(this), _fileNamePrefix(getConfigItem("binary_log_prefix", "")), _baseName(source->getBaseName() + "_binlog."), _fileNameSuffix(getConfigItem("binary_log_suffix")), _nextLogTime(0), _nextFlushTime(0) { getContainer()->addUser(_keepMeAlive); source->listenForRecords(this, mtLogThis); TclList msg; msg<isMaster(); std::string value = getConfigItem(source->getBaseName() + "_binary_log"); if (value == "1") create = true; else if (value == "0") create = false; if (create) new BinaryLogWriter(source); } }; void initBinaryLog() { BinaryLogWriter::init(IRecordDispatcher::getAlerts()); BinaryLogWriter::init(IRecordDispatcher::getTopList()); }