#ifndef __TalkWithServer_h_ #define __TalkWithServer_h_ /* TalkWithServer is the generic part of the client side of our standard * client-server communication. * * TalkWithServer is based on the C# version, roughly, but it is organized * differently. To fit in with the rest of the C++ server code, TalkWithServer * never creates its own threads. Someone else is responsible for threads * and for the call to poll(). These objects tell us what arguments to pass * to poll(), and we call these objects to tell them when to wake up. * * There are very few callbacks in this code. That is to support the nature * of the C++ program. For the most part you tell an object to wake up. It * updates its internal state, then you ask the object what its state is. * * A response from the server will generate a callback. But even that is * controlled. All potential callbacks are queued up until someone calls * TalkWithServer::doResponses(). This makes the code much more clear. * * As in the other versions of TalkWithServer, this code should be owned * by an object which knows more about the specific server we are connecting * to. It will be responsible for things like logging in, pinging, and * creating new TalkWithServer objects as needed. Each TalkWithServer can * only support one connection. Once the connection is broken, from either * side, you cannot reuse a TalkWithServer. */ #include #include #include #include #include #include #include // The high level object will own a socket object, probably indirectly. The // socket object does not have direct access to the poll() call, only the high // level object does. This gives us a clean and convenient way to pass the // necessary information between the high level object and the socket object, // possibly through several layers. class IPollElement { public: virtual bool wantsRead() =0; virtual bool wantsWrite() =0; virtual int getHandle() =0; virtual void wakeUp() =0; virtual ~IPollElement() { } }; // This is a wrapper around the zlib library that makes it easy to send // streaming data. We only create one input stream and one output stream // for each socket. After sending a message (or a group of messages) we // flush() the zlib output buffer and send it down the wire. class ZLibOutputStream { private: z_stream_s _zlibBuffer; std::string _last; bool _valid; void compress(std::string const &source, bool flush); public: std::string output; ZLibOutputStream(int level = Z_BEST_SPEED); ~ZLibOutputStream(); void add(void const *bytes, int count); void add(std::string bytes); void flush(); bool valid() const { return _valid; } }; // This is a wrapper around the zlib library that makes it easy to receive // streaming data. We only create one input stream and one output stream // for each socket. There is no explicit flush call here. If the other // side sends a flush(), we will get the data from zlib. We might also get // the data sooner; zlib does not preserve boundaries. class ZLibInputStream { private: z_stream_s _zlibBuffer; bool _valid; public: std::string output; ZLibInputStream(); ~ZLibInputStream(); void add(void const *bytes, int count); void add(std::string const &bytes); bool valid() const { return _valid; } }; class TcpIpConnection : public IPollElement { private: const std::string _name; int _handle; enum Status { NotYetOpen, Open, Closed }; Status _status; bool _writeSucceeded; // Set the file handle to non-blocking. Returns true on success and false // on failure. On failure changes the status to Closed and the handle to // -1. On failure sends info the the log. Failure seems unlikely in this // function. bool setNonBlocking(); public: TcpIpConnection(std::string const &name); ~TcpIpConnection(); // Only call this once. If asyncConnect is false, we know the result of // connect() before this function returns. If asyncConnect is true, this // function returns more quickly and you won't know if you connected or // not until after the call to select(). This was not always an option. // In previous revisions asyncConnect was always false. void connect(std::string address, std::string port, bool asyncConnect); bool closed() const; void close(); bool readyToSend(); // At least once write succeeded. If you connect asynchronously you might // want to know why the socket is closed. More precisely, did the socket // ever connect. If the connection is closed and this is false, that means // that we never connected. If the connection is closed and this is true, // we were connected at one time. This trick only works because we always // try to write right away. (We immediately try to log in.) For alternative // ways to check, see the following: http://cr.yp.to/docs/connect.html . bool writeSucceeded() const { return _writeSucceeded; } std::string toSend; std::string received; virtual bool wantsRead(); virtual bool wantsWrite(); virtual int getHandle(); virtual void wakeUp(); }; // Consider using TalkWithServer64 for new code. class TalkWithServer : public IPollElement { public: typedef uint64_t CancelId; class IMessageListener { public: virtual void onMessage(std::string bytes, int clientId, CancelId cancelId ) =0; virtual void onAbort(int clientId, CancelId cancelId) =0; virtual ~IMessageListener() { } }; typedef std::map< std::string, std::string > Message; private: const std::string _name; TcpIpConnection _tcpIpConnection; ZLibOutputStream _zLibOutputStream; ZLibInputStream _zLibInputStream; std::vector< Message > _outgoing; std::set< CancelId > _validResponses; static CancelId _lastCancelId; CancelId getNextCancelId(); enum Status { NotYetOpen, Open, Closed }; Status _status; struct ServerWrapper { IMessageListener *listener; CancelId cancelId; bool streaming; int clientId; }; std::map< int, ServerWrapper > _atServer; struct LocalWrapper { IMessageListener *listener; CancelId cancelId; std::string body; bool success; bool streaming; int clientId; }; std::vector< LocalWrapper > _localQueue; void checkForClosed(); void addStringWithSize(std::string toCompress); void addInt32(int32_t toCompress); public: TalkWithServer(std::string const &name); virtual ~TalkWithServer(); void connect(std::string address, std::string port); void disconnect(); bool disconnected(); void cancel(CancelId id); void cancelAll(); int pendingResponseCount(); void doResponses(); void sendMessage(Message const &message); CancelId sendMessage(Message message, IMessageListener *listener, int clientId, bool streaming = false); //static create message from array // connectiondown() -- make all of the callbacks to tell the listeners that we are down // connectionclosed() -- when we hear this from the connection, call connection down. virtual bool wantsRead(); virtual bool wantsWrite(); virtual int getHandle(); virtual void wakeUp(); }; #endif