#include #include #include #include #include "LogFile.h" #include "ThreadMonitor.h" #include "PollSet.h" PollSet::PollSet() : _timeout(-1) { } void PollSet::setTimeoutNone() { _timeout = -1; } void PollSet::setTimeoutMs(int timeout) { _timeout = std::max(0, timeout); } void PollSet::setTimeout(timeval *timeVal) { if (timeVal) { TimeVal::Microseconds microseconds = ((TimeVal *)timeVal)->asMicroseconds(); // Always round up. Otherwise, if we have less than a millisecond to // go, it is likely that we will immediately return, and the calling // function will call us again because it is not time yet. TimeVal::Microseconds milliseconds = (microseconds + 999) / 1000; // It is possible to request a time that is longer than we can represent // and send to the poll() call. In this case we wait as long as we can, // then it is up to the caller to ask us to wait again if necessary. setTimeoutMs(std::min((TimeVal::Microseconds)std::numeric_limits< int >::max(), milliseconds)); } else { setTimeoutNone(); } } void PollSet::addForRead(int fd) { _toPollRead.insert(fd); } void PollSet::removeForRead(int fd) { _toPollRead.erase(fd); } void PollSet::addForWrite(int fd) { _toPollWrite.insert(fd); } void PollSet::removeForWrite(int fd) { _toPollWrite.erase(fd); } void PollSet::clearFileDescriptors() { _toPollRead.clear(); _toPollWrite.clear(); } void PollSet::poll() { ThreadMonitor &m = ThreadMonitor::find(); std::string prevState = m.getState(); m.setState("poll() prepair"); //m.setState("poll() prepair check"); _woken.clear(); const HandleSet::size_type count = _toPollRead.size() + _toPollWrite.size(); if (count) { //m.setState("poll() prepair get_mem"); pollfd *fds = new pollfd[count]; //m.setState("poll() prepair copy"); HandleSet::size_type i = 0; for (HandleSet::const_iterator it = _toPollRead.begin(); it != _toPollRead.end(); it++, i++) { fds[i].fd = *it; fds[i].events = POLLIN; fds[i].revents = 0; } for (HandleSet::const_iterator it = _toPollWrite.begin(); it != _toPollWrite.end(); it++, i++) { fds[i].fd = *it; fds[i].events = POLLOUT; fds[i].revents = 0; } assert(i == count); m.setState("poll"); int result = ::poll(fds, count, _timeout); m.setState("poll() finish"); if ((result < 0) && (errno != EINTR)) { // Report error and abort. Interrupt is not really an error because // we just report nothing is ready. That's no different than a // timeout. // I have not seen any examples of this in the logs. LogFile::primary().quoteAndSend("PollSet.C", errorString(), 0); LogFile::primary().scheduleShutdown(); } else { for (i = 0; i < count; i++) { if (fds[i].revents & fds[i].events) { _woken.insert(fds[i].fd); } } } delete[] fds; } m.setState(prevState); } PollSet::HandleSet const &PollSet::woken() const { return _woken; }