#include "../../shared/Marshal.h" #include "RemoteHistoryRequests.h" ///////////////////////////////////////////////////////////////////// // ///////////////////////////////////////////////////////////////////// void unmarshal(std::string const &source, size_t &offset, RecordRef &value) { std::string encoded; unmarshal(source, offset, encoded); if (auto record = Record::create(encoded)) value = record; else throw MarshallingException("Invalid input to Record::create()"); } ///////////////////////////////////////////////////////////////////// // AlertHistoryRequest ///////////////////////////////////////////////////////////////////// bool AlertHistoryRequest::done() const { return (!connected()); // ? } void AlertHistoryRequest::buildAndSendRequest() { body.clear(); marshal(body, time); marshal(body, _restartKey); int requestCount = _rowsRequested - _rowsReturned; if (requestCount < 0) { // TODO Report to log? requestCount = 0; } marshal(body, requestCount); marshal(body, _endTime); marshal(body, _collaborate); RemoteHistoryRequest::submit(_me.lock()); } void AlertHistoryRequest::onEventInternal() { onEvent(_me.lock()); } void AlertHistoryRequest::updateResults() { try { bool closed = false; for (Event const &event : getEvents()) { switch (event.reason) { case EventType::Data: { time_t restartTime; std::string restartKey; std::vector< RecordRef > newRows; size_t offset = 0; unmarshal(*event.data, offset, restartTime); unmarshal(*event.data, offset, restartKey); unmarshal(*event.data, offset, newRows); unmarshalFinishedOrThrow(*event.data, offset); _restartKey = restartKey; time = restartTime; rows.insert(rows.end(), newRows.begin(), newRows.end()); break; } case EventType::RemoteGracefulClose: case EventType::UnexpectedClose: case EventType::LocalAbort: closed = true; break; case EventType::LocalEndOfData: time = 0; _restartKey.clear(); closed = true; break; } } if (closed && time && (_rowsRequested < _rowsReturned)) buildAndSendRequest(); } catch (MarshallingException const &) { abortAndRetry(_me.lock()); } } void AlertHistoryRequest::submit() { // TODO } std::shared_ptr< AlertHistoryRequest > AlertHistoryRequest::create(SocketInfo *socket, time_t startTime, time_t endTime, int maxCount, std::string const &restartKey, std::string const &collaborate) { std::shared_ptr< AlertHistoryRequest > result (new AlertHistoryRequest(socket)); result->_me = result; result->time = startTime; result->_endTime = endTime; result->_rowsRequested = maxCount; result->_rowsReturned = 0; result->_restartKey = restartKey; result->_collaborate = collaborate; return result; } ///////////////////////////////////////////////////////////////////// // TopListHistoryRequest ///////////////////////////////////////////////////////////////////// void TopListHistoryRequest::onEventInternal() { onEvent(_me.lock()); } void TopListHistoryRequest::updateResults() { //std::vector< RecordRef > result; for (Event const &event : getEvents()) { // TODO } } void TopListHistoryRequest::submit() { // TODO } std::shared_ptr< TopListHistoryRequest > TopListHistoryRequest::create(SocketInfo *socket) { std::shared_ptr< TopListHistoryRequest > result (new TopListHistoryRequest(socket)); result->_me = result; return result; } ///////////////////////////////////////////////////////////////////// // OddsMakerHistoryRequest ///////////////////////////////////////////////////////////////////// void OddsMakerHistoryRequest::onEventInternal() { onEvent(_me.lock()); } void OddsMakerHistoryRequest::updateResults() { //std::vector< RecordRef > result; for (Event const &event : getEvents()) { // TODO } } void OddsMakerHistoryRequest::submit() { // TODO } std::shared_ptr< OddsMakerHistoryRequest > OddsMakerHistoryRequest::create(SocketInfo *socket) { std::shared_ptr< OddsMakerHistoryRequest > result (new OddsMakerHistoryRequest(socket)); result->_me = result; return result; }