#ifndef __RemoteHistoryConnection_h_ #define __RemoteHistoryConnection_h_ #include #include "../../shared/SocketInfo.h" #include "../../shared/SmarterP.h" #include "../../shared/AtomicAccumulate.h" // If you want to send data to a remote history server, you need to make // a Request object and give it to the RemoteHistoryConnection. Presumably // there will be classes for OddsMakerRequest, AlertHistoryRequest and // TopListHistoryRequest which are all subclasses of Request. class RemoteHistoryRequest { public: typedef std::shared_ptr< RemoteHistoryRequest > Ref; private: bool _connected; protected: static void submit(Ref const &request); enum OnResponse { Normal, DrainAndResend, Abort }; OnResponse _onResponse; enum class EventType : char { // We received streaming data. This will be passed on to the individual // subclasses and it is up to them to interpret it. Data, // The remote side explicitly told the library that it was done. This // could happen for a number of reasons. The individual classes can // react to this in different ways. E.g. a top list request will // probably know it's done after receiving one result via onData(). A // history request might know it's done after receiving N good rows, // where N came from its own request. But history might also send a // special message via onData() telling us that we should restart at a // particular time. RemoteGracefulClose, // The remote server broke the connection without telling us. Possibly // we were never able to connect and we just now gave up. Possibly we // received a message and it was garbled so we canceled the request. // Generally this means to try again, possibly restarting from where the // last request left off. UnexpectedClose, // This means your request didn't match any servers. Like asking for // tomorrow's OddsMaker data. This takes into account the initial time // and direction. If you ask for a date 6 months ago going forward, // we'll start by finding the server with the oldest data. If you ask // for a date 6 months ago going backward, you'll get this, instead. // // This almost certainly means to stop. // // Some protocols will give you additional information via onData when // it's time to stop. When you receive Close::RemoteGraceful or // Close:Unexpected, you have to use that data (and general information // about your class) to decide wether or not to retry. In this case // there was no server to say we were done, much less why. LocalEndOfData, // Someone called RemoteHistoryConnection::abort(). LocalAbort, }; struct Event { EventType reason; SmarterP< std::string > data; Event(EventType reason = EventType::Data) : reason(reason) { } Event(SmarterP< std::string > &&data) : reason(EventType::Data), data(std::move(data)) { } Event(SmarterP< std::string > const &data) : reason(EventType::Data), data(data) { } }; virtual void onEventInternal() =0; std::vector< Event > getEvents(); void abortAndRetry(Ref const &request); private: void retryAfterAbort(); AtomicAccumulate< Event > _events; template < class... Args > void pushEvent(Args&&... args) { bool previouslyEmpty = _events.emplace(std::forward(args)...); if (previouslyEmpty) onEventInternal(); } void sendClose(EventType reason); friend class RemoteHistoryConnection; protected: std::string body; std::string command; time_t time; bool forward; public: SocketInfo *const socket; RemoteHistoryRequest(SocketInfo *socket) : _connected(false), _onResponse(OnResponse::Normal), socket(socket) { } // At this time there is no way to tell the upstream server to cancel a // single request. However, this can be inspected at various times. Like // when there's an incomplete response and we would normally resend the // request. These are thread safe. //bool aborted() const { return _aborted; } //void abort() { _aborted = true; } // When you submit() a request, the request object gets put into the // connected state. When the request is closed, it returns to the // unconnected state. You can only submit a request when it is not in the // connected state. bool connected() const { return _connected; } virtual ~RemoteHistoryRequest() {} }; #endif