#include "DataNodes.h" #include "../../shared/ThreadMonitor.h" #include "../../shared/SimpleLogFile.h" #include "Timers.h" //////////////////////////////////////////////////////////////////// // TimerThread //////////////////////////////////////////////////////////////////// TimerThread::~TimerThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } TimerThread::TimerThread(DataNodeManager *manager) : ThreadClass("TimerThread"), _incoming(getName()), _manager(manager) { startThread(); } void TimerThread::cancelTimerRequest(std::string const &channel, bool andDelete) { TimerEvent *event = getPropertyDefault(_byChannel, channel); if (event) { _byChannel.erase(event->channel); _byTime.erase(std::make_pair(TimeVal(event->nextTime), event)); if (andDelete) delete event; } } void TimerThread::addTimerRequest(TimerEvent *toAdd) { cancelTimerRequest(toAdd->channel); _byTime.insert(std::make_pair(TimeVal(toAdd->nextTime), toAdd)); _byChannel[toAdd->channel] = toAdd; } void TimerThread::fireEvent(TimerEvent *event) { static const std::string s_periodic = "periodic"; static const std::string s_single = "single"; BroadcastMessage *message = new BroadcastMessage(event->channel); //sendToLogFile(TclList()<channel // <<(event->periodic()?s_periodic:s_single)); message->send(_manager); if (event->periodic()) { ThreadMonitor::find().increment(s_periodic); event->nextTime.addMilliseconds(event->periodMS); addTimerRequest(event); } else { ThreadMonitor::find().increment(s_single); delete event; } } void TimerThread::requestPeriodicBroadcast(std::string const &channel, int periodMS, TimeVal firstTime) { assert(periodMS > 0); TimerEvent *event = new TimerEvent; event->channel = channel; if (firstTime) { event->nextTime = firstTime; event->absoluteTime = true; } else { event->nextTime = true; // Now event->absoluteTime = false; } event->periodMS = periodMS; _incoming.newRequest(event); ThreadMonitor::find().increment("requestPeriodicBroadcast." + ntoa(periodMS)); } void TimerThread::requestBroadcastAfter(std::string const &channel, int delayMS) { assert(delayMS > 0); TimerEvent *event = new TimerEvent; event->channel = channel; event->nextTime = true; event->nextTime.addMilliseconds(delayMS); event->periodMS = 0; event->absoluteTime = false; _incoming.newRequest(event); static const std::string s_requestBroadcastAfter = "requestBroadcastAfter"; ThreadMonitor::find().increment(s_requestBroadcastAfter); } void TimerThread::requestBroadcastAt(std::string const &channel, TimeVal firstTime) { assert(firstTime); TimerEvent *event = new TimerEvent; event->channel = channel; event->nextTime = firstTime; event->periodMS = 0; event->absoluteTime = true; _incoming.newRequest(event); static const std::string s_requestBroadcastAt = "requestBroadcastAt"; ThreadMonitor::find().increment(s_requestBroadcastAt); } void TimerThread::cancel(std::string const &channel) { CancelTimerRequest *r = new CancelTimerRequest; r->channel = channel; _incoming.newRequest(r); } void TimerThread::threadFunction() { while(true) { _incoming.resetWaitHandle(); while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtCancelTimer: { CancelTimerRequest *request = dynamic_cast< CancelTimerRequest * >(current); cancelTimerRequest(request->channel); break; } case mtTimerEvent: { TimerEvent *request = dynamic_cast< TimerEvent * >(current); addTimerRequest(request); current = NULL; break; } case mtQuit: { while (!_byTime.empty()) cancelTimerRequest(_byTime.begin()->second->channel); delete current; return; } } delete current; } TimeVal now(true); timeval waitTime; timeval *until = NULL; while(true) { std::set< std::pair< TimeVal, TimerEvent * > >::iterator it = _byTime.begin(); if (it == _byTime.end()) break; TimeVal eventTime = it->first; if (eventTime > now) { waitTime = eventTime.waitTime(); until = &waitTime; break; } TimerEvent *event = it->second; cancelTimerRequest(event->channel, false); fireEvent(event); } _incoming.waitForRequest(until); } } /* This comes frm the Delphi version of the code. This was called any time we saw the clock going backwards. I think it's safe to ignore this code. For one thing, when you change the time in Unix, it's normally done in a smooth way. It particular, unix tries to avoid jumping backwards. It seems especially unlikely that the time would change munch while we are trying to process events. I don't remember what was causing this problem in the first place. Leaving the software running when daylight savings time changed? That's not an issue for unix. A clock synchronizer which was not as gentle? // Assume the clock was just reset. Execute each event in the queue, in // order. For repeating events, tell them that this was the time when they // were supposed to go off. This is far from a perfect solution, but it will // prevent us from locking up as often. MISSING: What happens if the time // changes after we create the event, but before we add it to the queue? // There is a small window that this does not help. Procedure TTimerThread.DumpAllEvents; Var Events : Array Of IUnknown; I : Integer; Event : TTimerEvent; Begin CriticalSection.Enter; Try SetLength(Events, EventList.ItemCount); For I := 0 To Pred(Length(Events)) Do Events[I] := EventList.Pop; Finally CriticalSection.Leave End; For I := 0 To Pred(Length(Events)) Do Begin Event := (Events[I] As ITimerEvent).Implementor As TTimerEvent; If Event.AbsoluteTime Then Begin // I don't want my hourly bars to be created 14.2 minutes after // the hour. Just put this back and pretend we never took it out. AddEvent(Event); Dec(EventsIn) End Else Begin Event.NextTime := Now; Inc(EventsOut); Event.DoCallback End End End; */