#include //#include #include "../shared/rapidxml.hpp" #include "../shared/Random.h" #include "../shared/ReplyToClient.h" #include "../shared/SimpleLogFile.h" #include "../shared/CommandDispatcher.h" #include "../shared/IPollSet.h" #include "../shared/ThreadClass.h" #include "../shared/TalkWithServer64.h" #include "../shared/ThreadMonitor.h" //#include "../shared/TclUtil.h" #include "../shared/GlobalConfigFile.h" #include "../shared/XmlSupport.h" #include "../shared/LogFile.h" #include "../shared/XmlReader.h" #include "UserInfo.h" #include "TimerQueue.h" #include "ForwardingThread.h" /* This is the main body of the micro_proxy. This takes requests from the * client and sends them to the server. This takes the responses from the * server and sends them back to the client. * * The client connects to this process in the normal way: NewConnections.C, * InputFramework.C, and ReplyToClient.C. This file is responsible for all of * the connections to the severs. I.e. creating connections, sending and * receiving data, detecting and initiating disconnects. */ /* TODO: We need to get better at segregating the data. Some users might get * some but not other data. For the most part we want to handle that here, * because this seems like a convenient place to put the rules. We can stop * certain commands from being delivered. We can either ignore them completely * or send a canned response. * * The initial case is for one of Brad's customers. These customers should be * able to get Brokerage Plus and the OddsMaker, but not the Price Alerts or * the the AI. */ namespace ForwardingThreadNS { static const std::string s_server_disconnect_all = "server disconnect all"; static const std::string s_server_disconnect_one = "server disconnect one"; static const std::string s_sendLoginMessage_full = "sendLoginMessage full"; static const std::string s_sendLoginMessage_limited = "sendLoginMessage limited"; static const std::string s_forward_response = "forward response"; static const std::string s_deferredClose = "deferredClose"; static const std::string s_send_ping = "send ping"; static const std::string s_immediateClose = "immediateClose"; static const std::string s_updatePollSet = "updatePollSet"; static const std::string s_onServerClosed = "onServerClosed"; static const std::string s_new_server_connection = "new server connection"; static const std::string s_needConnection = "needConnection()"; static const std::string s_mtForward = "mtForward"; static const std::string s_mtDebugDump = "mtDebugDump"; static const std::string s_delete_socket = "delete socket"; static const std::string s_threadFunction_timer_callbacks = "threadFunction timer callbacks"; static const std::string s_threadFunction_prepare_to_sleep = "threadFunction prepare to sleep"; static const std::string s_threadFunction_notify_handles = "threadFunction notify handles"; static const std::string s_threadFunction_commands = "threadFunction commands"; static const std::string s_onPollReturn_wakeUp = "onPollReturn wakeUp"; static const std::string s_onPollReturn_doResponses = "onPollReturn doResponses"; static const std::string s_onPollReturn_updatePollSet = "onPollReturn updatePollSet"; // If you want to be woken up by the pollset, you need to implement this // interface. This is one of the improvements we've made compared to // existing library routines. ContainerThread, for example, has one generic // wakeup call that it sends to everything in the container. That doesn't // scale well. We expect thousands or tens of thousands of listeners. class PollReturn { public: virtual void onPollReturn() =0; virtual ~PollReturn() {} }; class Server; class Command; class ClientConnection; class ServerHistory { private: const bool _enabled; ExternalRequest::MessageId _firstId; ExternalRequest::MessageId _lastId; std::set< Command const * > _commands; public: ServerHistory(Server *server); void add(Command const *command, ExternalRequest::MessageId id); void close(SocketInfo *clientSocket); void debugDump(XmlNode &parent) const; }; // This is a callback from the server connection to the client connection. // Each client connection can creates multiple server connections. The // client connection needs to keep track of which server connections are // open. The client connection needs to know if it can reuse a server // connection or if it has to create a new one. class ServerClosed { public: virtual void onServerClosed(Server *server) =0; virtual ~ServerClosed() { } }; // The ForwardingThread object exports a few things to other objects. // Use this interface to avoid circular references. class Resources { public: // getCommand() returns NULL if nothing is found. virtual Command const *getCommand(std::string const &clientCommand) =0; virtual IPollSet &getPollSet() =0; virtual void setPollListener(int handle, PollReturn* listener) =0; void clearPollListener(int handle) { setPollListener(handle, NULL); } virtual TimerQueue &getTimerQueue() =0; virtual ~Resources() {} }; // This represents a possible command. When this thread receives a command // from the client, it needs to know more information, like what server // implements this command. // // These are owned by the server. Don't destroy the server until you are // done with the commands. class Command { Server *_server; std::string _serverName; std::string _clientName; std::string _subcommand; public: // A simple command. The command name the client sends us is the same // as the command name that we send to the server. This is the common // case. // // Note that _server is often NULL while processing the config file, but // it cannot be null in the final version. See the version of Command() // which changes the server of an existing command. Command(Server *server, std::string const &name) : _server(server), _serverName(name), _clientName(name) { } // This will translate the command name. For example, most servers all // have a "ping" command. If the client sends "ping" which server does it // mean? This file could implement "alert_ping" which sends "ping" to the // alert server, and "candle_ping" which sends "ping" to the candle server. // // This is used a lot with flex_command in TIQ. Different TIQ servers all // handle different sub commands of flex_command. Command(Server *server, std::string const &clientName, std::string const &serverName, std::string const &subcommand) : _server(server), _serverName(serverName), _clientName(clientName), _subcommand(subcommand) { } // This version of the constructor is aimed at the factory mechanism. // // _server is NULL because we don't know yet. See reparent() to fix that. // // If clientName is empty(), replace it with serverName. This is useful // for filling in a default value. Command(std::string const &clientName, std::string const &serverName, std::string const &subcommand) : _server(NULL), _serverName(serverName), _clientName(clientName.empty()?serverName:clientName), _subcommand(subcommand) { } // Default constructor. Required for use in an STL container. Command() : _server(NULL) { } // Returns true if this appears to have come from the default constructor. // I.e. if this was created by calling [] on a map where the key wasn't // present. bool uninitialized() const { return _clientName.empty(); } bool initialized() const { return !uninitialized(); } void reparent(Server *server) { _server = server; } Server *getServer(ClientConnection *clientConnection) const; std::string const &getServerName() const { return _serverName; } std::string const &getClientName() const { return _clientName; } std::string const &getSubcommand() const { return _subcommand; } bool replaceSubcommand() const { return !_subcommand.empty(); } TclList debugDump() const { TclList result; if (!_serverName.empty()) result<<"_serverName"<<_serverName; if (!_clientName.empty()) result<<"_clientName"<<_clientName; if (!_subcommand.empty()) result<<"_subcommand"<<_subcommand; // I'm assuming that we are being called by the server, that's why // we don't bother reporting anything about _server. Avoid a loop // and/or a lot of duplication. return result; } }; // This represents the final destination of most commands from the client. // // This represents one or more identical processes. This can implement a // load balancer itself. In the case of ax_alert_server, this will probably // point to the existing load balancer. The existing load balancer can't // go away, and we don't want the list of servers to live in more than one // place. class Server { public: struct Address { std::string hostname; std::string port; }; private: std::string _name; std::map< std::string, Command > _commands; std::vector< Address > _addresses; std::vector< Address > _demoAddresses; bool _closeClientIfThisCloses; bool _sendPings; bool _allowDemoUsers; bool _loginWithPassword; Server(std::string const &name) : _name(name), _closeClientIfThisCloses(false), _sendPings(false), _allowDemoUsers(true), _loginWithPassword(false) { } void addCommand(Command const &command); // Cannot be a duplicate! void addAddress(std::string const &hostname, std::string const &port); void addDemoAddress(std::string const &hostname, std::string const &port); // This server appears to have been created by the default constructor. // I.e. listOfServer[x] created a new server rather than returning an // existing server. bool uninitialized() const { return _name.empty(); } bool initialized() const { return !uninitialized(); } public: // Required to work with an STL container. :( Server() : Server("") { } // Copy a server object. Make sure the internal pointers are correct. // Must be public to work with an STL container. :( Server(Server const &other) : _name(other._name), _addresses(other._addresses), _demoAddresses(other._demoAddresses), _closeClientIfThisCloses(other._closeClientIfThisCloses), _sendPings(other._sendPings), _allowDemoUsers(other._allowDemoUsers), _loginWithPassword(other._loginWithPassword) { for (auto &kvp : other._commands) // kvp is name => command. addCommand(kvp.second); } std::map< std::string, Command > const &getCommands() const { return _commands; } // These lists is always the same. Presumably the caller will pick one at // random. std::vector< Address > const &getAddresses() const { return _addresses; } std::vector< Address > const &getDemoAddresses() const { return _demoAddresses; } // Ideally any server can close and that only affects the conversations // with that server. However, we have to deal with the bulk of the // existing client which expects certain things to all fail at once. This // helps us simulate that. Newer servers should have a different protocol // which makes it easy for the client to restart various pieces of code // without restarting the entire connection. bool closeClientIfThisCloses() const { return _closeClientIfThisCloses; } // This means that we're trying to keep the server alive. It's really // aimed at the legacy ax_alert_server. As we move more things from there // we might get rid of this. bool sendPings() const { return _sendPings; } // Should users be allowed to access this sever if they are logged in as // DEMO? This is a recommendation. Command::getServer() makes the // decision. bool allowDemoUsers() const { return _allowDemoUsers; } // True means that we need to provide a username and password when talking // with the remote server. Normally we send a special message saying that // the proxy has already verified this user. This mode is mostly made // for debugging. Set up your own proxy in your own virtual machine. Send // some requests to servers in your virtual machine. Send the rest of the // requests to the normal/live/production proxy in the TI data center. bool loginWithPassword() const { return _loginWithPassword; } // For logging and debugging. std::string const &getName() const { return _name; } TclList debugDump() const { TclList result; result<<"_name"<<_name; { result<<"_commands"; std::map< std::string, std::string > commands; for (auto const &kvp : _commands) commands[kvp.first] = kvp.second.debugDump(); result << commands; } { result << "addresses"; TclList addresses; for (auto const &address : _addresses) addresses << (address.hostname + ":" + address.port); result << addresses; } { result << "demoAddresses"; TclList demoAddresses; for (auto const &demoAddress : _demoAddresses) demoAddresses << (demoAddress.hostname + ":" + demoAddress.port); result << demoAddresses; } result<<"_closeClientIfThisCloses"<<_closeClientIfThisCloses <<"_sendPings"<<_sendPings <<"_allowDemoUsers"<<_allowDemoUsers <<"_loginWithPassword"<<_loginWithPassword <<"initialized()"< _serversByServerName; // Every command in the XML file shows up here. When we receive a // command from the client we look up its name and use that to find // a server. We never allow a command to be shared. When you apply // a patch you can change some details about the command, but not which // server handles it. std::map< std::string, std::string > _serversByCommandName; // This is used when testing in a virtual machine. You create a patch // file naming the servers you want to handle. And the patch file // includes instructions for forwarding any commands it doesn't know // about to the next server. We read the "main" file just for a list // of commands. (You can't forward a wildcard, you have to name each // command explicitly.) This says which server we are forwarding to. // Or this is empty to say no such server. std::string _chainServerName; public: static std::string debugDump(Status status) { switch (status) { case Uninitialized: return "Uninitialized"; case Error: return "Error"; case Untested: return "Untested"; case ValidMain: return "ValidMain"; case ValidPatch: return "ValidPatch"; default: return "UNKNOWN(" + ntoa((int)status) + ")"; } } TclList debugDump() const { TclList result; result<<"_status"< serversByServerName; for (const auto &kvp : _serversByServerName) serversByServerName[kvp.first] = kvp.second.debugDump(); result< create() const; }; }; // This represents the connection to a particular server. This is // disposible. When the server is disconnected, delete this object. Create // a new one if you need to reconnect. class ServerConnection : private TalkWithServer64::IAllResponsesListener, private PollReturn, private TimerQueue::Listener { private: // The deadman timer on the server side defaults to 5 minutes. // So lets send a ping every 2 minutes. static const int PING_TIME = 2 * 60 * 1000; static const PropertyList PING_COMMAND; SocketInfo *const _clientSocket; Server *const _server; ServerHistory _history; std::string const _debugName; Resources &_resources; ServerClosed *const _serverClosed; TalkWithServer64 *_talkWithServer; void updatePollSet(); void sendLoginMessage(); void sendMessage(PropertyList const &message); // The list of addresses that we can try. If one fails we remove it from // the list. If one succeeds we clear the list. The idea is that if one // fails we quickly and automatically try the next. The client doesn't see // an error unless (a) we did not find any good servers or (b) we were // connected but then we disconnected. std::vector< Server::Address > _addresses; // If there are multiple physical servers for this virtual server, // write the name and address to distinguish the physical servers in the // log. If there's only one physical server, use only the name of the // virtual server, without the address, to keep the logs shorter and // easier to read. bool _onlyOneAddress; // This is internal. Our constructor will call this. We might call this // again if we receive an error asynchronously. Only call this if there // is no _talkWithServer or the current _talkWithServer is disconnected. // // This might cause the ServerConnection to close. In that case we'll // always create a timer event and do the close when the timer goes off. // Sometimes we need to add a pause to keep the retries from happening too // fast. But it's always good to use the timer event even if we only // sleep for one millisecond. When you call needConnection() you don't // have to worry about the ServerConnection object being deleted before // you are ready. void needConnection(); // If there's a failure before this time, wait for at least time time. // If we fail immediately, we typically wait one second before trying // again. But we might not see the failure message right away. If the // remote server timed out, we might or might not have already waited // long enough. TimeVal::Microseconds _minimumRetryTime; virtual void onResponse(std::string bytes, TalkWithServer64::MessageId messageId); virtual void onPollReturn(); virtual void onTimeOut(); public: ServerConnection(SocketInfo *clientSocket, Server *server, Resources &resources, ServerClosed *serverClosed); ~ServerConnection(); void sendMessage(Command const *command, ExternalRequest *request); void debugDump(XmlNode &parent) const; }; // This represents the client side of a connection. Typically when a client // opens a TCP/IP connection to this program, one of these objects will be // allocated. When that TCP/IP connection is broken, this object is deleted. // // This object will own some number of ServerConnection objects. class ClientConnection : private ServerClosed { private: SocketInfo *const _clientSocket; Resources &_resources; const UserInfoExport _userInfo; // Cached; std::map< Server *, ServerConnection * > _serverConnections; ServerConnection *getServerConnection(Server *server); virtual void onServerClosed(Server *server); public: ClientConnection(SocketInfo *socket, Resources &resources); ~ClientConnection(); void forward(ExternalRequest *request); UserInfoExport const &getUserInfo() const { return _userInfo; } void debugDump(XmlNode &parent) const; }; // I've been using ContainerThread.h to create new threads. The code seems // cleaner than inheriting from ThreadClass. (In particular // ContainerThread.h gets rid of a lot of boilerplate code.) The problem is // that ContainerThread.h was written for threads that each host a small // number of objects. ForwardingThread will need to manage thousands of // separate connections. Among other differences, ContainerThread.h uses // PollSet where ForwardingThread will use IPollSet. class ForwardingThread : private ThreadClass, Resources { private: IPollSet _pollSet; std::map< int, PollReturn * > _handleToAction; std::set< Server const * > _servers; std::map< std::string, Command const * > _commandsByName; TimerQueue _timerQueue; virtual TimerQueue &getTimerQueue() { return _timerQueue; } std::map< SocketInfo *, ClientConnection * > _clientConnections; ClientConnection *getClientConnection(SocketInfo *socket); enum { mtForward, mtDebugDump, mtQuit }; SelectableRequestQueue _incomingRequests; // Returns NULL if nothing is found. virtual Command const *getCommand(std::string const &clientCommand); virtual IPollSet &getPollSet() { return _pollSet; } virtual void setPollListener(int handle, PollReturn *listener); void initializeServers(); protected: void threadFunction(); public: ForwardingThread(); ~ForwardingThread(); }; ///////////////////////////////////////////////////////////////////// // ServerHistory ///////////////////////////////////////////////////////////////////// ServerHistory::ServerHistory(Server *server) : _enabled(!server->closeClientIfThisCloses()) { } void ServerHistory::add(Command const *command, ExternalRequest::MessageId id) { if (!_enabled) // We don't need any history for this server. return; if (id.isEmpty()) // We only record messages that expect responses. return; if (_firstId.isEmpty()) _firstId = id; _lastId = id; _commands.insert(command); } void ServerHistory::close(SocketInfo *clientSocket) { if (!_enabled) { // This is the alternative to history. This is for older clients (and // older client libraries) which don't know how to handle the new server // closed message. ThreadMonitor::find().increment(s_server_disconnect_all); DeleteSocketThread::deleteSocket(clientSocket); } else { ThreadMonitor::find().increment(s_server_disconnect_one); std::string message; message += ntoa(_firstId.getValue()); message += '\n'; message += ntoa(_lastId.getValue()); message += '\n'; for (auto it = _commands.begin(); it != _commands.end(); it++) { message += (*it)->getClientName(); message += '\n'; } addToOutputQueue(clientSocket, message, ExternalRequest::MessageId(-1)); //TclList msg; //msg<getClientName(); } } } ///////////////////////////////////////////////////////////////////// // Command ///////////////////////////////////////////////////////////////////// Server *Command::getServer(ClientConnection *clientConnection) const { switch (clientConnection->getUserInfo().status) { case sFull: return _server; case sLimited: if (_server->allowDemoUsers()) return _server; else return NULL; default: return NULL; } } ///////////////////////////////////////////////////////////////////// // Server::Factory ///////////////////////////////////////////////////////////////////// void Server::Factory::load(RapidXmlElement list) { clear(); if (!list) { // Presumably an error upstream. Couldn't load or parse the file or // something like that. _messages<<" list is NULL"; _status = Error; } else { _status = Untested; for (RapidXmlElement top : list) { std::string const name = top.attribute("NAME"); if (name.empty()) { _messages<<"Creating server, can't find NAME."; _status = Error; } else if (_serversByServerName.count(name)) { _messages<<("Duplicate server name: " + name); _status = Error; } else { Server server(name); // Parsing some boolean properties. Warning: Different libraries // parse Booleans according to slightly different rules. If you use // "0" and "1" for false and true, you should be fine. (That's with // my libraries. C#'s Boolean.TryParse() does not accept this. Our // C# XmlHelper explicitly allows integers (just like we're doing // below) or anything that Boolean.TryParse() accepts.) If your file // includes "TRUE" or something like that, this code will silently // ignore it and take the default. server._closeClientIfThisCloses = top.attribute("CLOSE_CLIENT_IF_THIS_CLOSES", server._closeClientIfThisCloses); server._sendPings = top.attribute("SEND_PINGS", server._sendPings); server._allowDemoUsers = top.attribute("ALLOW_DEMO_USERS", server._allowDemoUsers); server._loginWithPassword = top.attribute("LOGIN_WITH_PASSWORD", server._loginWithPassword); if (server._loginWithPassword) { // Perhaps this name should change. We're assuming you can only // use this one way. We're assuming that if you have one of these, // you're planning to chain to another micro_proxy. You handle // the commands that you choose to, then you pass everything else // to this server. One micro_proxy could chain to multiple // micro_proxies, but you'd have to find some nice way to explain // it in the config file. I'm assuming that no one would want to // write that config file themselves because you'd have way too // many entries in the list of commands. If we ever needed that // feature, we'd want a better way to configure it. if (!_chainServerName.empty()) { _messages<<(TclList()<<"Multiple Chain Servers" <<_chainServerName<_addresses; } for (auto &kvp : main._serversByCommandName) { std::string const &commandName = kvp.first; if (!_serversByCommandName.count(commandName)) { // We were not already handling this command, so add it now. std::string const &serverName = kvp.second; Server const *const mainServer = getProperty(main._serversByServerName, serverName); // mainServer must exist if main._serversByServerName and // main._serversByCommandName are in sync. assert(mainServer); Server &finalServer = _serversByServerName[serverName]; if (finalServer.uninitialized()) { // This server was not in the patch file. So copy it from the // main file. finalServer = *mainServer; // But don't copy the commands. Some might have already been // assigned to a different server in the patch file. This loop // will copy the appropriate commands to this server, possibly // skipping some. finalServer._commands.clear(); } Command const *const originalCommand = getProperty(mainServer->_commands, commandName); // originalCommand must exist if mainServer is consistent. assert(originalCommand); finalServer.addCommand(*originalCommand); } } } else { // If a command is orphaned, add that command to the chain server. Server &chainServer = _serversByServerName[patch._chainServerName]; assert(chainServer.initialized()); for (auto &kvp : main._serversByCommandName) { std::string const &commandName = kvp.first; if (!_serversByCommandName.count(commandName)) { // We were not already handling this command, so add it now. std::string const &serverName = kvp.second; Server const *const mainServer = getProperty(main._serversByServerName, serverName); // mainServer must exist if main._serversByServerName and // main._serversByCommandName are in sync. assert(mainServer); Command const *const originalCommand = getProperty(mainServer->_commands, commandName); // originalCommand must exist if mainServer is consistent. assert(originalCommand); if (Server *const patchServer = getProperty(_serversByServerName, serverName)) // The patch file created a server with the same name as the main // file. Copy the command to the server with the same name. patchServer->addCommand(*originalCommand); else // The patch file did not create this specific server, so use the // chain server. chainServer.addCommand(*originalCommand); } } } } bool Server::Factory::validateMain() { switch (_status) { case Uninitialized: // Tempting to say assert(false) here. case Error: // Errors are explicitly sticky. Don't retry. // Note: If there are multiple error messages You might not see all // of them because we exit early. That's hard to fix, and you'll // always get at least one error message if there is one, so this is // okay. return false; case Untested: // Do the tests listed below. Change the state to Error or ValidMain. break; case ValidMain: // Cached. We explicitly allow this to be called multiple times and // promise that it will be fast. return true; case ValidPatch: // Same as untested. Maybe there's a good reason to call one validate // function then the other. break; } _status = ValidMain; /* Until proven otherwise. (If we hit an error we * change _status, but we keep going to see if we * can find additional errors. */ for (auto const &kvp : _serversByServerName) { std::string const &name = kvp.first; Server const &server = kvp.second; if (server.uninitialized()) { // Probably an internal error. We try to catch this when reading from // the XML file. Maybe someone tried reading from // _serversByServerName[] and accidentally created a new server. This // could probably be an assertion. _status = Error; _messages<<"Unnamed / uninitialized server."; } if (server._addresses.empty()) { _status = Error; _messages<<(TclList()<<"Require at least one address for server" < Server::Factory::create() const { // I originally planned to call validateMain() here but... // a) Then this method couldn't be const. // b) If you skipped the validate step you'd have no way to create // a good error message. An invalid input file would cause an // assertion to fail. assert(_status == ValidMain); std::set< Server const * > result; for (auto const &kvp : _serversByServerName) result.insert(new Server(kvp.second)); return result; } void Server::Factory::loadSingleFile(std::string const &fileName) { clear(); RapidXmlDocument document; document.loadFromFile(fileName); if (!document) { _messages< " + server->getName()), _resources(resources), _serverClosed(serverClosed), _talkWithServer(NULL) { ThreadMonitor::SetState tm(s_new_server_connection); tm.increment(s_new_server_connection); if (_server->closeClientIfThisCloses()) // The whole connection to the client will be closed. In this case the // client is responsible for any pauses before the retry. _minimumRetryTime = 0; else // Wait one second. But that's one second from when we started, not // from when we discovered the problem. _minimumRetryTime = TimeVal(true).addSeconds(1).asMicroseconds(); UserInfoExport userInfo = userInfoGetInfo(_clientSocket); if(userInfo.status == sLimited && !server->getDemoAddresses().empty()) _addresses = server->getDemoAddresses(); else _addresses = server->getAddresses(); _onlyOneAddress = _addresses.size() == 1; if (_addresses.empty()) { // This if statement isn't strictly required. This prints a different // message to the log to make the problem more clear. And this has a // longer retry time since the problem won't go away on its own. tm.increment("no servers in config file " + server->getName()); _resources.getTimerQueue().addMSFromNow(this, 15000); // 15 seconds. } else needConnection(); } template < class T > static T pickOne(std::vector< T > &vector) { assert(!vector.empty()); int index = getRandom31() % vector.size(); T result = vector[index]; int lastIndex = vector.size() - 1; if (index != lastIndex) // Delete the item we picked. We don't care about order, so move the // last item into that slot, rather than sliding several items down one // slot each. vector[index] = vector[lastIndex]; vector.resize(lastIndex); return result; } void ServerConnection::needConnection() { ThreadMonitor::SetState tm(s_needConnection); tm.increment(s_needConnection); while (true) { const bool previouslyConnected = _talkWithServer && _talkWithServer->writeSucceeded(); if (_talkWithServer) { assert(_talkWithServer->disconnected()); updatePollSet(); delete _talkWithServer; _talkWithServer = NULL; } if (previouslyConnected) // If we were ever connected tell the client about the disconnect and // let the client deal with it. This could be caused by anything and // is not necessarily an error. Some servers will time out if they // have not had a request in a while. The client is smart enough // not to try to connect again until it actually needs something. break; if (_addresses.empty()) { // We have tried all servers. None of them are working. Report it // to the client and let the client deal with it. Mark this in the // log so we know how often this happen. tm.increment("all servers down " + _server->getName()); break; } // The long name includes the client socket id. The long name is good // for logging statements. The short name is better for // ThreadMonitor::increment(), so multiple clients can all be compressed // into one. std::string shortName = _server->getName(); std::string longName = _debugName; Server::Address address = pickOne(_addresses); if (!_onlyOneAddress) { const std::string addressString = ' ' + address.hostname + ':' + address.port; shortName += addressString; longName += addressString; } _talkWithServer = new TalkWithServer64(longName); _talkWithServer->connect(address.hostname, address.port, true); if (_talkWithServer->disconnected()) // Failed immediately. Maybe the name was invalid and could not // be resolved. We might not see the failure here. If the remote // server is down we will probably see the error when we first try to // write. tm.increment("failed to connect " + shortName); else { // Appears successful. _talkWithServer->setAllResponsesListener(this); tm.increment("connected " + shortName); updatePollSet(); sendLoginMessage(); _resources.setPollListener(_talkWithServer->getHandle(), this); if (_server->sendPings()) _resources.getTimerQueue().addMSFromNow(this, PING_TIME); return; } } // If we get here there was a serious error. An error that we can't fix // ourselves. We will schedule the destruction of this object. That will // notify the client and the client has the option of retrying. const int64_t now = getMicroTime(); const int64_t timeout = std::max(1LL, (_minimumRetryTime - now + 500) / 1000); _resources.getTimerQueue().addMSFromNow(this, timeout); //LogFile::primary().sendString(TclList()<getName(); //if (_talkWithServer) // msg<<(_talkWithServer->disconnected()?"DISCONNECTED":"connected"); //else // msg<<"NULL"; //LogFile::primary().sendString(msg, _clientSocket); if (_talkWithServer) { _resources.clearPollListener(_talkWithServer->getHandle()); _resources.getPollSet().removeForRead(_talkWithServer->getHandle()); _resources.getPollSet().removeForWrite(_talkWithServer->getHandle()); _talkWithServer->disconnect(); delete _talkWithServer; } //msg.clear(); //msg<loginWithPassword()) { // This is aimed at chaining from one proxy to the next. This is similar // to the old ax_alert_server login, but simpler. The ax_alert_server // required a second command to finish the login process. (If you don't // send the second command the connection will appear frozen for no // obvious reason.) message["command"] = "login"; message["username"] = userInfo.username; message["password"] = userInfo.password; message["vendor_id"] = "chained micro_proxy"; } else { // This is new the preferred method for login. Let the proxy do the work // for you. All other servers trust the proxy. message["command"] = "proxy_login"; if (userInfo.userId) message["user_id"] = ntoa(userInfo.userId); if (userInfo.status == sLimited) message["status"] = "limited"; else if (userInfo.status == sFull) message["status"]= "full"; } sendMessage(message); } void ServerConnection::sendMessage(PropertyList const &message) { //TclList items; //for (auto it = message.begin(); it != message.end(); it++) // items<<(TclList()<first<second); //TclList msg; //msg<getName(); //if (_talkWithServer) // msg<<(_talkWithServer->disconnected()?"DISCONNECTED":"connected"); //else // msg<<"NULL"; //msg<disconnected()) _talkWithServer->sendMessage(message); updatePollSet(); } void ServerConnection::sendMessage(Command const *command, ExternalRequest *request) { _history.add(command, request->getResponseMessageId()); sendMessage(request->getProperties()); } void ServerConnection::onResponse(std::string bytes, TalkWithServer64::MessageId messageId) { ThreadMonitor::find().increment(s_forward_response); addToOutputQueue(_clientSocket, bytes, ExternalRequest::MessageId(messageId)); } // Don't request a response. We could. That might make the code more // robust. But it would also make this more complicated. // // Our goal here is to tell the server receiving this message that we // are still alive. If we ever really wanted to check if the remote // server was alive, we'd probably do things differently. One option // would be to set up a special version of ping that doesn't affect the // server's dead man timer, so we could call it on all servers. Another // approach would be to change the tcp_keepalive_time socket option, // so the operating system would automatically send the pings at // appropriate times. const PropertyList ServerConnection::PING_COMMAND = { {"command", "ping"} }; void ServerConnection::onTimeOut() { if ((!_talkWithServer)||_talkWithServer->disconnected()) { // We got here because of _talkWithServer->disconnected(). // If it happens in the constructor we don't want to report the problem // immediately. That would cause various problems. Instead we ask the // timer to call us back soon. // // One problem is that the callback would happen before the listener // was ready. The object that owns this object, will create this object // and store it in table. The callback will look for this object in the // table. If we're still in the constructor, then the owner couldn't // have stored us anywhere. // // Another problem is avoiding wasted resources. In case of an error we // don't want to keep retrying over and over. That would use a lot of // CPU and a lot of network. Multiple programs would be involved. If // there's a connection problem, start by sleeping for one second, then // try again. //TclList msg; //msg<onServerClosed(_server); return; } if (_server->sendPings()) { ThreadMonitor::find().increment(s_send_ping); PropertyList message; message["command"] = "ping"; sendMessage(message); _resources.getTimerQueue().addMSFromNow(this, PING_TIME); } } void ServerConnection::onPollReturn() { if (!_talkWithServer) return; ThreadMonitor::SetState state(s_onPollReturn_wakeUp); _talkWithServer->wakeUp(); state.setState(s_onPollReturn_doResponses); _talkWithServer->doResponses(); state.setState(s_onPollReturn_updatePollSet); updatePollSet(); if (_talkWithServer->disconnected()) { // We're treating all errors the same. needConnection() will decide // what to do. It might try another server immediately. Or it might // schedule this ServerConnection to be destroyed so the client has // to deal with it. //LogFile::primary().sendString(TclList()<getHandle(); IPollSet &pollSet = _resources.getPollSet(); if (_talkWithServer->wantsRead()) pollSet.addForRead(handle); else pollSet.removeForRead(handle); if (_talkWithServer->wantsWrite()) pollSet.addForWrite(handle); else pollSet.removeForWrite(handle); //TclList msg; //msg<wantsRead() // <<"wants write"<<_talkWithServer->wantsWrite(); //LogFile::primary().sendString(msg, _clientSocket); } void ServerConnection::debugDump(XmlNode &parent) const { XmlNode &result = parent[-1]; result.name = "SERVER_CONNECTION"; result.properties["debug_name"] = _debugName; if (_talkWithServer) { XmlNode &talkWithServer = result["TALK_WITH_SERVER"]; // Disconnected means that the TalkWithServer knows that the connection // is closed. We've had some problems detecting this. So far it appears // that as soon as TalkWithServer knows this, the main program will // handle the case correctly. But for some reason this is sometimes // false when the log for the down steam server says it disconnected. talkWithServer.properties["disconnected"] = _talkWithServer->disconnected()?"true":"false"; talkWithServer.properties["write_succeeded"] = _talkWithServer->writeSucceeded()?"true":"false"; talkWithServer.properties["name"] = _talkWithServer->getName(); // watchingForRead() should match wantsRead() at any given time. Either // watchingForRead() or watchingForWrite() should be sufficient to find // a socket closed by the downstream server. talkWithServer.properties["polling_for_read"] = _resources.getPollSet().watchingForRead(_talkWithServer->getHandle()) ?"true":"false"; // watchingForWrite() should match wantsWrite() at any time. talkWithServer.properties["polling_for_write"] = _resources.getPollSet().watchingForWrite(_talkWithServer->getHandle()) ?"true":"false"; // wantsRead(). This should probably always be true. TalkWithServer // will respond with false if the connection is closed. But this // program will remove the TalkWithServer object in that case. talkWithServer.properties["wants_read"] = _talkWithServer->wantsRead()?"true":"false"; // wantsWrite(). This should probably always be false. This will be // true if you write a lot to the downstream server and the OS's buffer // gets full. Our client usually tries not to flood the network. // Furthermore, since the client is far away on the internet, but the // downstream server is on the same LAN as the proxy, it seems unlikely // that the client could overwhelm the downstream server. talkWithServer.properties["wants_write"] = _talkWithServer->wantsWrite()?"true":"false"; } _history.debugDump(result); result.properties["server_name"] = _server->getName(); result.properties["real_servers_remaining"] = ntoa(_addresses.size()); } ///////////////////////////////////////////////////////////////////// // ClientConnection ///////////////////////////////////////////////////////////////////// ClientConnection::ClientConnection(SocketInfo *socket, Resources &resources) : _clientSocket(socket), _resources(resources), _userInfo(userInfoGetInfo(socket)) { } void ClientConnection::onServerClosed(Server *server) { ThreadMonitor::SetState setState(s_onServerClosed); setState.increment(s_onServerClosed); auto it = _serverConnections.find(server); if (it != _serverConnections.end()) { delete it->second; _serverConnections.erase(it); } } void ClientConnection::forward(ExternalRequest *request) { std::string const &clientCommand = request->getCommand(); if (Command const *command = _resources.getCommand(clientCommand)) { if (Server *const server = command->getServer(this)) { ServerConnection *serverConnection = getServerConnection(server); request->setCommand(command->getServerName()); if (command->replaceSubcommand()) request->getProperties()["subcommand"] = command->getSubcommand(); serverConnection->sendMessage(command, request); } } } ServerConnection *ClientConnection::getServerConnection(Server *server) { ServerConnection *&result = _serverConnections[server]; if (!result) { result = new ServerConnection(_clientSocket, server, _resources, this); //TclList msg; //msg<getName(); //LogFile::primary().sendString(msg, _clientSocket); } return result; } ClientConnection::~ClientConnection() { for (auto it = _serverConnections.begin(); it != _serverConnections.end(); it++) delete it->second; DeleteSocketThread::deleteSocket(_clientSocket); } void ClientConnection::debugDump(XmlNode &parent) const { { XmlNode &userInfo = parent["USER_INFO"]; userInfo.properties["user_id"] = ntoa(_userInfo.userId); std::string status; switch (_userInfo.status) { case sNone: status = "sNone"; break; case sLimited: status = "sLimited"; break; case sFull: status = "sFull"; break; default: status = ntoa(_userInfo.status); break; } userInfo.properties["status"] = status; userInfo.properties["username"] = _userInfo.username; } XmlNode &serverConnections = parent["SERVER_CONNECTIONS"]; for (auto it = _serverConnections.begin(); it != _serverConnections.end(); it++) it->second->debugDump(serverConnections); } ///////////////////////////////////////////////////////////////////// // ForwardingThread ///////////////////////////////////////////////////////////////////// void ForwardingThread::threadFunction() { ThreadMonitor &tm = ThreadMonitor::find(); _pollSet.addForRead(_incomingRequests.getWaitHandle()); while (true) { tm.setState(s_threadFunction_timer_callbacks); _timerQueue.doAllCallbacks(); //msg.clear(); //msg<onPollReturn(); } } // Look for commands. tm.setState(s_threadFunction_commands); _incomingRequests.resetWaitHandle(); while (Request *current = _incomingRequests.getRequest()) { switch (current->callbackId) { case mtQuit: { delete current; return; } case mtForward: { ThreadMonitor::SetState setState(s_mtForward); setState.increment(s_mtForward); ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); getClientConnection(socket)->forward(request); break; } case mtDebugDump: { ThreadMonitor::SetState setState(s_mtDebugDump); setState.increment(s_mtDebugDump); ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); XmlNode reply; reply.properties["socket"] = ntoa(SocketInfo::getSerialNumber(socket)); reply.properties["host_name"] = getShortHostName(); if (ClientConnection *clientConnection = getPropertyDefault(_clientConnections, socket)) clientConnection->debugDump(reply); addToOutputQueue(socket, reply.asString(), request->getResponseMessageId()); break; } case DeleteSocketThread::callbackId: { ThreadMonitor::SetState setState(s_delete_socket); setState.increment(s_delete_socket); SocketInfo *socket = current->getSocketInfo(); auto it = _clientConnections.find(socket); if (it != _clientConnections.end()) { delete it->second; _clientConnections.erase(it); } break; } } delete current; } } } ForwardingThread::ForwardingThread() : ThreadClass("ForwardingThread"), _incomingRequests(getName()) { CommandDispatcher &cd = *CommandDispatcher::getInstance(); cd.listenForCommand("micro_proxy_debug_dump", &_incomingRequests, mtDebugDump, /* lock */ false, /* immuneToLock */ true); initializeServers(); startThread(); } ForwardingThread::~ForwardingThread() { Request *request = new Request(NULL); request->callbackId = mtQuit; _incomingRequests.newRequest(request); waitForThread(); } void ForwardingThread::initializeServers() { CommandDispatcher &cd = *CommandDispatcher::getInstance(); const std::string mainFileName = getConfigItem("server_config", "init.xml"); const std::string patchFileName = getConfigItem("server_patches"); Server::Factory factory; factory.loadSingleFile(mainFileName); sendToLogFile(TclList()<getCommands()) { Command const &command = kvp.second; std::string const &clientName = command.getClientName(); cd.listenForCommand(clientName, &_incomingRequests, mtForward); // Remember, the command is owned by the Server object. If you // destroy the Server object, the pointer to the command will no // longer be valid. _commandsByName[clientName] = &command; } } } ClientConnection *ForwardingThread::getClientConnection(SocketInfo *socket) { ClientConnection *&result = _clientConnections[socket]; if (!result) result = new ClientConnection(socket, *this); return result; } Command const *ForwardingThread::getCommand(std::string const &clientCommand) { // This is allowed to return NULL, but that seems unlikely. We used the // same list of commands to populate _commandsByName and to initialize // the command dispatcher. return getPropertyDefault(_commandsByName, clientCommand); } void ForwardingThread::setPollListener(int handle, PollReturn *listener) { if (listener) _handleToAction[handle] = listener; else _handleToAction.erase(handle); } } void initForwardingThread() { new ForwardingThreadNS::ForwardingThread(); } std::string testForwardingConfigFile(std::string const &fileName) { ForwardingThreadNS::Server::Factory factory; factory.loadSingleFile(fileName); if (factory.validateMain()) return ""; else if (factory.getMessages().empty()) // This branch shouldn't be required, but we want to be certain. Returning // "" means no error. return "Unable to load config file."; else return factory.getMessages(); }