#include //#include #include "../shared/rapidxml.hpp" #include "../shared/Random.h" #include "../shared/ReplyToClient.h" #include "../shared/SimpleLogFile.h" #include "../shared/CommandDispatcher.h" #include "../shared/IPollSet.h" #include "../shared/ThreadClass.h" #include "../shared/TalkWithServer64.h" #include "../shared/ThreadMonitor.h" //#include "../shared/TclUtil.h" #include "../shared/GlobalConfigFile.h" #include "../shared/XmlSupport.h" #include "../shared/LogFile.h" #include "../shared/XmlReader.h" #include "UserInfo.h" #include "TimerQueue.h" #include "ForwardingThread.h" /* This is the main body of the micro_proxy. This takes requests from the * client and sends them to the server. This takes the responses from the * server and sends them back to the client. * * The client connects to this process in the normal way: NewConnections.C, * InputFramework.C, and ReplyToClient.C. This file is responsible for all of * the connections to the severs. I.e. creating connections, sending and * receiving data, detecting and initiating disconnects. */ /* TODO: We need to get better at segregating the data. Some users might get * some but not other data. For the most part we want to handle that here, * because this seems like a convenient place to put the rules. We can stop * certain commands from being delivered. We can either ignore them completely * or send a canned response. * * The initial case is for one of Brad's customers. These customers should be * able to get Brokerage Plus and the OddsMaker, but not the Price Alerts or * the the AI. */ namespace ForwardingThreadNS { static const std::string s_server_disconnect_all = "server disconnect all"; static const std::string s_server_disconnect_one = "server disconnect one"; static const std::string s_sendLoginMessage_full = "sendLoginMessage full"; static const std::string s_sendLoginMessage_limited = "sendLoginMessage limited"; static const std::string s_forward_response = "forward response"; static const std::string s_deferredClose = "deferredClose"; static const std::string s_send_ping = "send ping"; static const std::string s_immediateClose = "immediateClose"; static const std::string s_updatePollSet = "updatePollSet"; static const std::string s_onServerClosed = "onServerClosed"; static const std::string s_new_server_connection = "new server connection"; static const std::string s_needConnection = "needConnection()"; static const std::string s_mtForward = "mtForward"; static const std::string s_mtDebugDump = "mtDebugDump"; static const std::string s_delete_socket = "delete socket"; static const std::string s_threadFunction_timer_callbacks = "threadFunction timer callbacks"; static const std::string s_threadFunction_prepare_to_sleep = "threadFunction prepare to sleep"; static const std::string s_threadFunction_notify_handles = "threadFunction notify handles"; static const std::string s_threadFunction_commands = "threadFunction commands"; static const std::string s_onPollReturn_wakeUp = "onPollReturn wakeUp"; static const std::string s_onPollReturn_doResponses = "onPollReturn doResponses"; static const std::string s_onPollReturn_updatePollSet = "onPollReturn updatePollSet"; // If you want to be woken up by the pollset, you need to implement this // interface. This is one of the improvements we've made compared to // existing library routines. ContainerThread, for example, has one generic // wakeup call that it sends to everything in the container. That doesn't // scale well. We expect thousands or tens of thousands of listeners. class PollReturn { public: virtual void onPollReturn() =0; virtual ~PollReturn() {} }; class Server; class Command; class ClientConnection; class ServerHistory { private: const bool _enabled; ExternalRequest::MessageId _firstId; ExternalRequest::MessageId _lastId; std::set< Command const * > _commands; public: ServerHistory(Server *server); void add(Command const *command, ExternalRequest::MessageId id); void close(SocketInfo *clientSocket); void debugDump(XmlNode &parent) const; }; // This is a callback from the server connection to the client connection. // Each client connection can creates multiple server connections. The // client connection needs to keep track of which server connections are // open. The client connection needs to know if it can reuse a server // connection or if it has to create a new one. class ServerClosed { public: virtual void onServerClosed(Server *server) =0; virtual ~ServerClosed() { } }; // The ForwardingThread object exports a few things to other objects. // Use this interface to avoid circular references. class Resources { public: // getCommand() returns NULL if nothing is found. virtual Command const *getCommand(std::string const &clientCommand) =0; virtual IPollSet &getPollSet() =0; virtual void setPollListener(int handle, PollReturn* listener) =0; void clearPollListener(int handle) { setPollListener(handle, NULL); } virtual TimerQueue &getTimerQueue() =0; virtual ~Resources() {} }; // This represents a possible command. When this thread receives a command // from the client, it needs to know more information, like what server // implements this command. // // These are owned by the server. Don't destroy the server until you are // done with the commands. class Command { Server *_server; std::string _serverName; std::string _clientName; std::string _subcommand; public: // A simple command. The command name the client sends us is the same // as the command name that we send to the server. This is the common // case. // // Note that _server is often NULL while processing the config file, but // it cannot be null in the final version. See the version of Command() // which changes the server of an existing command. Command(Server *server, std::string const &name) : _server(server), _serverName(name), _clientName(name) { } // This will translate the command name. For example, most servers all // have a "ping" command. If the client sends "ping" which server does it // mean? This file could implement "alert_ping" which sends "ping" to the // alert server, and "candle_ping" which sends "ping" to the candle server. // // This is used a lot with flex_command in TIQ. Different TIQ servers all // handle different sub commands of flex_command. Command(Server *server, std::string const &clientName, std::string const &serverName, std::string const &subcommand) : _server(server), _serverName(serverName), _clientName(clientName), _subcommand(subcommand) { } // This version of the constructor is aimed at the factory mechanism. // // _server is NULL because we don't know yet. See reparent() to fix that. // // If clientName is empty(), replace it with serverName. This is useful // for filling in a default value. Command(std::string const &clientName, std::string const &serverName, std::string const &subcommand) : _server(NULL), _serverName(serverName), _clientName(clientName.empty()?serverName:clientName), _subcommand(subcommand) { } // Default constructor. Required for use in an STL container. Command() : _server(NULL) { } // Returns true if this appears to have come from the default constructor. // I.e. if this was created by calling [] on a map where the key wasn't // present. bool uninitialized() const { return _clientName.empty(); } bool initialized() const { return !uninitialized(); } void reparent(Server *server) { _server = server; } Server *getServer(ClientConnection *clientConnection) const; std::string const &getServerName() const { return _serverName; } std::string const &getClientName() const { return _clientName; } std::string const &getSubcommand() const { return _subcommand; } bool replaceSubcommand() const { return !_subcommand.empty(); } TclList debugDump() const { TclList result; if (!_serverName.empty()) result<<"_serverName"<<_serverName; if (!_clientName.empty()) result<<"_clientName"<<_clientName; if (!_subcommand.empty()) result<<"_subcommand"<<_subcommand; // I'm assuming that we are being called by the server, that's why // we don't bother reporting anything about _server. Avoid a loop // and/or a lot of duplication. return result; } }; // This represents the final destination of most commands from the client. // // This represents one or more identical processes. This can implement a // load balancer itself. In the case of ax_alert_server, this will probably // point to the existing load balancer. The existing load balancer can't // go away, and we don't want the list of servers to live in more than one // place. class Server { public: struct Address { enum Type { TCP, KAFKA }; Type type; // TCP fields std::string hostname; std::string port; // Kafka fields std::string requestTopic; std::string responseTopic; std::string brokers; }; private: std::string _name; std::map< std::string, Command > _commands; std::vector< Address > _addresses; std::vector< Address > _demoAddresses; bool _closeClientIfThisCloses; bool _sendPings; bool _allowDemoUsers; bool _loginWithPassword; Server(std::string const &name) : _name(name), _closeClientIfThisCloses(false), _sendPings(false), _allowDemoUsers(true), _loginWithPassword(false) { } void addCommand(Command const &command); // Cannot be a duplicate! void addAddress(Address const &address); void addDemoAddress(std::string const &hostname, std::string const &port); // This server appears to have been created by the default constructor. // I.e. listOfServer[x] created a new server rather than returning an // existing server. bool uninitialized() const { return _name.empty(); } bool initialized() const { return !uninitialized(); } public: // Required to work with an STL container. :( Server() : Server("") { } // Copy a server object. Make sure the internal pointers are correct. // Must be public to work with an STL container. :( Server(Server const &other) : _name(other._name), _addresses(other._addresses), _demoAddresses(other._demoAddresses), _closeClientIfThisCloses(other._closeClientIfThisCloses), _sendPings(other._sendPings), _allowDemoUsers(other._allowDemoUsers), _loginWithPassword(other._loginWithPassword) { for (auto &kvp : other._commands) // kvp is name => command. addCommand(kvp.second); } std::map< std::string, Command > const &getCommands() const { return _commands; } // These lists is always the same. Presumably the caller will pick one at // random. std::vector< Address > const &getAddresses() const { return _addresses; } std::vector< Address > const &getDemoAddresses() const { return _demoAddresses; } // Ideally any server can close and that only affects the conversations // with that server. However, we have to deal with the bulk of the // existing client which expects certain things to all fail at once. This // helps us simulate that. Newer servers should have a different protocol // which makes it easy for the client to restart various pieces of code // without restarting the entire connection. bool closeClientIfThisCloses() const { return _closeClientIfThisCloses; } // This means that we're trying to keep the server alive. It's really // aimed at the legacy ax_alert_server. As we move more things from there // we might get rid of this. bool sendPings() const { return _sendPings; } // Should users be allowed to access this sever if they are logged in as // DEMO? This is a recommendation. Command::getServer() makes the // decision. bool allowDemoUsers() const { return _allowDemoUsers; } // True means that we need to provide a username and password when talking // with the remote server. Normally we send a special message saying that // the proxy has already verified this user. This mode is mostly made // for debugging. Set up your own proxy in your own virtual machine. Send // some requests to servers in your virtual machine. Send the rest of the // requests to the normal/live/production proxy in the TI data center. bool loginWithPassword() const { return _loginWithPassword; } // For logging and debugging. std::string const &getName() const { return _name; } TclList debugDump() const { TclList result; result<<"_name"<<_name; { result<<"_commands"; std::map< std::string, std::string > commands; for (auto const &kvp : _commands) commands[kvp.first] = kvp.second.debugDump(); result << commands; } { result << "addresses"; TclList addresses; for (auto const &address : _addresses) addresses << (address.hostname + ":" + address.port); result << addresses; } { result << "demoAddresses"; TclList demoAddresses; for (auto const &demoAddress : _demoAddresses) demoAddresses << (demoAddress.hostname + ":" + demoAddress.port); result << demoAddresses; } result<<"_closeClientIfThisCloses"<<_closeClientIfThisCloses <<"_sendPings"<<_sendPings <<"_allowDemoUsers"<<_allowDemoUsers <<"_loginWithPassword"<<_loginWithPassword <<"initialized()"< _serversByServerName; // Every command in the XML file shows up here. When we receive a // command from the client we look up its name and use that to find // a server. We never allow a command to be shared. When you apply // a patch you can change some details about the command, but not which // server handles it. std::map< std::string, std::string > _serversByCommandName; // This is used when testing in a virtual machine. You create a patch // file naming the servers you want to handle. And the patch file // includes instructions for forwarding any commands it doesn't know // about to the next server. We read the "main" file just for a list // of commands. (You can't forward a wildcard, you have to name each // command explicitly.) This says which server we are forwarding to. // Or this is empty to say no such server. std::string _chainServerName; public: static std::string debugDump(Status status) { switch (status) { case Uninitialized: return "Uninitialized"; case Error: return "Error"; case Untested: return "Untested"; case ValidMain: return "ValidMain"; case ValidPatch: return "ValidPatch"; default: return "UNKNOWN(" + ntoa((int)status) + ")"; } } TclList debugDump() const { TclList result; result<<"_status"< serversByServerName; for (const auto &kvp : _serversByServerName) serversByServerName[kvp.first] = kvp.second.debugDump(); result< create() const; }; }; class IServerConnection { public: virtual void sendMessage(Command const *command, ExternalRequest *request) = 0; virtual void debugDump(XmlNode &parent) const = 0; virtual ~IServerConnection() {} }; // This represents the connection to a particular server. This is // disposible. When the server is disconnected, delete this object. Create // a new one if you need to reconnect. class TcpServerConnection : private TalkWithServer64::IAllResponsesListener, private PollReturn, private TimerQueue::Listener, public IServerConnection { private: // The deadman timer on the server side defaults to 5 minutes. // So lets send a ping every 2 minutes. static const int PING_TIME = 2 * 60 * 1000; static const PropertyList PING_COMMAND; SocketInfo *const _clientSocket; Server *const _server; ServerHistory _history; std::string const _debugName; Resources &_resources; ServerClosed *const _serverClosed; TalkWithServer64 *_talkWithServer; void updatePollSet(); void sendLoginMessage(); void sendMessage(PropertyList const &message); // The list of addresses that we can try. If one fails we remove it from // the list. If one succeeds we clear the list. The idea is that if one // fails we quickly and automatically try the next. The client doesn't see // an error unless (a) we did not find any good servers or (b) we were // connected but then we disconnected. std::vector< Server::Address > _addresses; // If there are multiple physical servers for this virtual server, // write the name and address to distinguish the physical servers in the // log. If there's only one physical server, use only the name of the // virtual server, without the address, to keep the logs shorter and // easier to read. bool _onlyOneAddress; // This is internal. Our constructor will call this. We might call this // again if we receive an error asynchronously. Only call this if there // is no _talkWithServer or the current _talkWithServer is disconnected. // // This might cause the ServerConnection to close. In that case we'll // always create a timer event and do the close when the timer goes off. // Sometimes we need to add a pause to keep the retries from happening too // fast. But it's always good to use the timer event even if we only // sleep for one millisecond. When you call needConnection() you don't // have to worry about the ServerConnection object being deleted before // you are ready. void needConnection(); // If there's a failure before this time, wait for at least time time. // If we fail immediately, we typically wait one second before trying // again. But we might not see the failure message right away. If the // remote server timed out, we might or might not have already waited // long enough. TimeVal::Microseconds _minimumRetryTime; virtual void onResponse(std::string bytes,TalkWithServer64::MessageId messageId); virtual void onPollReturn(); virtual void onTimeOut(); public: TcpServerConnection(SocketInfo *clientSocket, Server *server, Resources &resources, ServerClosed *serverClosed); ~TcpServerConnection(); void sendMessage(Command const *command, ExternalRequest *request); void debugDump(XmlNode &parent) const; }; // This represents the client side of a connection. Typically when a client // opens a TCP/IP connection to this program, one of these objects will be // allocated. When that TCP/IP connection is broken, this object is deleted. // // This object will own some number of ServerConnection objects. class ClientConnection : private ServerClosed { private: SocketInfo *const _clientSocket; Resources &_resources; const UserInfoExport _userInfo; // Cached; std::map< Server *, IServerConnection * > _serverConnections; IServerConnection *getServerConnection(Server *server); virtual void onServerClosed(Server *server); public: ClientConnection(SocketInfo *socket, Resources &resources); ~ClientConnection(); void forward(ExternalRequest *request); UserInfoExport const &getUserInfo() const { return _userInfo; } void debugDump(XmlNode &parent) const; }; // I've been using ContainerThread.h to create new threads. The code seems // cleaner than inheriting from ThreadClass. (In particular // ContainerThread.h gets rid of a lot of boilerplate code.) The problem is // that ContainerThread.h was written for threads that each host a small // number of objects. ForwardingThread will need to manage thousands of // separate connections. Among other differences, ContainerThread.h uses // PollSet where ForwardingThread will use IPollSet. class ForwardingThread : private ThreadClass, Resources { private: IPollSet _pollSet; std::map< int, PollReturn * > _handleToAction; std::set< Server const * > _servers; std::map< std::string, Command const * > _commandsByName; TimerQueue _timerQueue; virtual TimerQueue &getTimerQueue() { return _timerQueue; } std::map< SocketInfo *, ClientConnection * > _clientConnections; ClientConnection *getClientConnection(SocketInfo *socket); enum { mtForward, mtDebugDump, mtQuit }; SelectableRequestQueue _incomingRequests; // Returns NULL if nothing is found. virtual Command const *getCommand(std::string const &clientCommand); virtual IPollSet &getPollSet() { return _pollSet; } virtual void setPollListener(int handle, PollReturn *listener); void initializeServers(); protected: void threadFunction(); public: ForwardingThread(); ~ForwardingThread(); }; ///////////////////////////////////////////////////////////////////// // ServerHistory ///////////////////////////////////////////////////////////////////// ServerHistory::ServerHistory(Server *server) : _enabled(!server->closeClientIfThisCloses()) { } void ServerHistory::add(Command const *command, ExternalRequest::MessageId id) { if (!_enabled) // We don't need any history for this server. return; if (id.isEmpty()) // We only record messages that expect responses. return; if (_firstId.isEmpty()) _firstId = id; _lastId = id; _commands.insert(command); } void ServerHistory::close(SocketInfo *clientSocket) { if (!_enabled) { // This is the alternative to history. This is for older clients (and // older client libraries) which don't know how to handle the new server // closed message. ThreadMonitor::find().increment(s_server_disconnect_all); DeleteSocketThread::deleteSocket(clientSocket); } else { ThreadMonitor::find().increment(s_server_disconnect_one); std::string message; message += ntoa(_firstId.getValue()); message += '\n'; message += ntoa(_lastId.getValue()); message += '\n'; for (auto it = _commands.begin(); it != _commands.end(); it++) { message += (*it)->getClientName(); message += '\n'; } addToOutputQueue(clientSocket, message, ExternalRequest::MessageId(-1)); //TclList msg; //msg<getClientName(); } } } ///////////////////////////////////////////////////////////////////// // Command ///////////////////////////////////////////////////////////////////// Server *Command::getServer(ClientConnection *clientConnection) const { switch (clientConnection->getUserInfo().status) { case sFull: return _server; case sLimited: if (_server->allowDemoUsers()) return _server; else return nullptr; default: return nullptr; } } ///////////////////////////////////////////////////////////////////// // Server::Factory ///////////////////////////////////////////////////////////////////// void Server::Factory::load(RapidXmlElement list) { clear(); if (!list) { // Presumably an error upstream. Couldn't load or parse the file. _messages << " list is NULL"; _status = Error; } else { _status = Untested; for (RapidXmlElement top : list) { std::string const name = top.attribute("NAME"); if (name.empty()) { _messages << "Creating server, can't find NAME."; _status = Error; } else if (_serversByServerName.count(name)) { _messages << ("Duplicate server name: " + name); _status = Error; } else { Server server(name); // Parsing boolean properties server._closeClientIfThisCloses = top.attribute("CLOSE_CLIENT_IF_THIS_CLOSES", server._closeClientIfThisCloses); server._sendPings = top.attribute("SEND_PINGS", server._sendPings); server._allowDemoUsers = top.attribute("ALLOW_DEMO_USERS", server._allowDemoUsers); server._loginWithPassword = top.attribute("LOGIN_WITH_PASSWORD", server._loginWithPassword); if (server._loginWithPassword) { if (!_chainServerName.empty()) { _messages << (TclList() << "Multiple Chain Servers" << _chainServerName << name); _status = Error; } else { _chainServerName = name; } } for (RapidXmlElement address : top["ADDRESSES"]) { const std::string type = address.attribute("TYPE", "tcp"); Address addr; if (type == "kafka") { addr.type = Address::KAFKA; addr.requestTopic = address.attribute("REQUEST_TOPIC"); addr.responseTopic = address.attribute("RESPONSE_TOPIC"); addr.brokers = address.attribute("BROKERS"); if (addr.requestTopic.empty() || addr.responseTopic.empty() || addr.brokers.empty()) { _status = Error; _messages << "Kafka address missing required attributes"; } else { server.addAddress(addr); } } else { // Existing TCP parsing code addr.type = Address::TCP; addr.hostname = address.attribute("HOST"); addr.port = address.attribute("PORT"); if (strtolDefault(addr.port, 0) <= 0) { _status = Error; _messages << (TclList() << "Invalid port" << addr.port << "for host" << addr.hostname << "on server" << name); } else { server.addAddress(addr); } } } for (RapidXmlElement address : top["DEMO_ADDRESSES"]) { const std::string host = address.attribute("HOST"); const std::string port = address.attribute("PORT"); if (strtolDefault(port, 0) <= 0) { _status = Error; _messages << (TclList() << "Invalid port" << port << "for host" << host << "on server" << name); } else { server.addDemoAddress(host, port); } } for (RapidXmlElement command : top["COMMANDS"]) { const std::string nameAtClient = command.attribute("CLIENT"); const std::string nameAtServer = command.attribute("SERVER"); const std::string subcommand = command.attribute("SUBCOMMAND"); if (nameAtServer.empty()) { _status = Error; TclList msg; msg << "Found a command without a SERVER attribute in server" << name; _messages << msg; } else { // Create the command. Command command(nameAtClient, nameAtServer, subcommand); if (_serversByCommandName.count(command.getClientName())) { // Duplicate command name. TclList msg; msg << "Duplicate command name" << command.getClientName() << "first server" << _serversByCommandName[command.getClientName()] << "another server" << name; _messages << msg; _status = Error; } else { server.addCommand(command); _serversByCommandName[command.getClientName()] = name; } } } _serversByServerName[name] = server; } } if ((_status == Untested) && _serversByServerName.empty()) { // Presumably this is a sign of another problem. I guess you could // create a valid file with no servers, but that would be unusual. _status = Error; _messages << "No servers found in file."; } } } Server::Factory::Factory(Factory const &main, Factory const &patch) : _status(Untested) { if (!main.isValidMain()) { _status = Error; TclList msg; msg<<"Invalid main file"<_addresses; } for (auto &kvp : main._serversByCommandName) { std::string const &commandName = kvp.first; if (!_serversByCommandName.count(commandName)) { // We were not already handling this command, so add it now. std::string const &serverName = kvp.second; Server const *const mainServer = getProperty(main._serversByServerName, serverName); // mainServer must exist if main._serversByServerName and // main._serversByCommandName are in sync. assert(mainServer); Server &finalServer = _serversByServerName[serverName]; if (finalServer.uninitialized()) { // This server was not in the patch file. So copy it from the // main file. finalServer = *mainServer; // But don't copy the commands. Some might have already been // assigned to a different server in the patch file. This loop // will copy the appropriate commands to this server, possibly // skipping some. finalServer._commands.clear(); } Command const *const originalCommand = getProperty(mainServer->_commands, commandName); // originalCommand must exist if mainServer is consistent. assert(originalCommand); finalServer.addCommand(*originalCommand); } } } else { // If a command is orphaned, add that command to the chain server. Server &chainServer = _serversByServerName[patch._chainServerName]; assert(chainServer.initialized()); for (auto &kvp : main._serversByCommandName) { std::string const &commandName = kvp.first; if (!_serversByCommandName.count(commandName)) { // We were not already handling this command, so add it now. std::string const &serverName = kvp.second; Server const *const mainServer = getProperty(main._serversByServerName, serverName); // mainServer must exist if main._serversByServerName and // main._serversByCommandName are in sync. assert(mainServer); Command const *const originalCommand = getProperty(mainServer->_commands, commandName); // originalCommand must exist if mainServer is consistent. assert(originalCommand); if (Server *const patchServer = getProperty(_serversByServerName, serverName)) // The patch file created a server with the same name as the main // file. Copy the command to the server with the same name. patchServer->addCommand(*originalCommand); else // The patch file did not create this specific server, so use the // chain server. chainServer.addCommand(*originalCommand); } } } } bool Server::Factory::validateMain() { switch (_status) { case Uninitialized: // Tempting to say assert(false) here. case Error: // Errors are explicitly sticky. Don't retry. // Note: If there are multiple error messages You might not see all // of them because we exit early. That's hard to fix, and you'll // always get at least one error message if there is one, so this is // okay. return false; case Untested: // Do the tests listed below. Change the state to Error or ValidMain. break; case ValidMain: // Cached. We explicitly allow this to be called multiple times and // promise that it will be fast. return true; case ValidPatch: // Same as untested. Maybe there's a good reason to call one validate // function then the other. break; } _status = ValidMain; /* Until proven otherwise. (If we hit an error we * change _status, but we keep going to see if we * can find additional errors. */ for (auto const &kvp : _serversByServerName) { std::string const &name = kvp.first; Server const &server = kvp.second; if (server.uninitialized()) { // Probably an internal error. We try to catch this when reading from // the XML file. Maybe someone tried reading from // _serversByServerName[] and accidentally created a new server. This // could probably be an assertion. _status = Error; _messages<<"Unnamed / uninitialized server."; } if (server._addresses.empty()) { _status = Error; _messages<<(TclList()<<"Require at least one address for server" < Server::Factory::create() const { // I originally planned to call validateMain() here but... // a) Then this method couldn't be const. // b) If you skipped the validate step you'd have no way to create // a good error message. An invalid input file would cause an // assertion to fail. assert(_status == ValidMain); std::set< Server const * > result; for (auto const &kvp : _serversByServerName) result.insert(new Server(kvp.second)); return result; } void Server::Factory::loadSingleFile(std::string const &fileName) { clear(); RapidXmlDocument document; document.loadFromFile(fileName); if (!document) { _messages< " + server->getName()), _resources(resources), _serverClosed(serverClosed), _talkWithServer(NULL) { ThreadMonitor::SetState tm(s_new_server_connection); tm.increment(s_new_server_connection); if (_server->closeClientIfThisCloses()) // The whole connection to the client will be closed. In this case the // client is responsible for any pauses before the retry. _minimumRetryTime = 0; else // Wait one second. But that's one second from when we started, not // from when we discovered the problem. _minimumRetryTime = TimeVal(true).addSeconds(1).asMicroseconds(); UserInfoExport userInfo = userInfoGetInfo(_clientSocket); if(userInfo.status == sLimited && !server->getDemoAddresses().empty()) _addresses = server->getDemoAddresses(); else _addresses = server->getAddresses(); _onlyOneAddress = _addresses.size() == 1; if (_addresses.empty()) { // This if statement isn't strictly required. This prints a different // message to the log to make the problem more clear. And this has a // longer retry time since the problem won't go away on its own. tm.increment("no servers in config file " + server->getName()); _resources.getTimerQueue().addMSFromNow(this, 15000); // 15 seconds. } else needConnection(); } template < class T > static T pickOne(std::vector< T > &vector) { assert(!vector.empty()); int index = getRandom31() % vector.size(); T result = vector[index]; int lastIndex = vector.size() - 1; if (index != lastIndex) // Delete the item we picked. We don't care about order, so move the // last item into that slot, rather than sliding several items down one // slot each. vector[index] = vector[lastIndex]; vector.resize(lastIndex); return result; } void TcpServerConnection::needConnection() { ThreadMonitor::SetState tm(s_needConnection); tm.increment(s_needConnection); while (true) { const bool previouslyConnected = _talkWithServer && _talkWithServer->writeSucceeded(); if (_talkWithServer) { assert(_talkWithServer->disconnected()); updatePollSet(); delete _talkWithServer; _talkWithServer = NULL; } if (previouslyConnected) // If we were ever connected tell the client about the disconnect and // let the client deal with it. This could be caused by anything and // is not necessarily an error. Some servers will time out if they // have not had a request in a while. The client is smart enough // not to try to connect again until it actually needs something. break; if (_addresses.empty()) { // We have tried all servers. None of them are working. Report it // to the client and let the client deal with it. Mark this in the // log so we know how often this happen. tm.increment("all servers down " + _server->getName()); break; } // The long name includes the client socket id. The long name is good // for logging statements. The short name is better for // ThreadMonitor::increment(), so multiple clients can all be compressed // into one. std::string shortName = _server->getName(); std::string longName = _debugName; Server::Address address = pickOne(_addresses); if (!_onlyOneAddress) { const std::string addressString = ' ' + address.hostname + ':' + address.port; shortName += addressString; longName += addressString; } _talkWithServer = new TalkWithServer64(longName); _talkWithServer->connect(address.hostname, address.port, true); if (_talkWithServer->disconnected()) // Failed immediately. Maybe the name was invalid and could not // be resolved. We might not see the failure here. If the remote // server is down we will probably see the error when we first try to // write. tm.increment("failed to connect " + shortName); else { // Appears successful. _talkWithServer->setAllResponsesListener(this); tm.increment("connected " + shortName); updatePollSet(); sendLoginMessage(); _resources.setPollListener(_talkWithServer->getHandle(), this); if (_server->sendPings()) _resources.getTimerQueue().addMSFromNow(this, PING_TIME); return; } } // If we get here there was a serious error. An error that we can't fix // ourselves. We will schedule the destruction of this object. That will // notify the client and the client has the option of retrying. const int64_t now = getMicroTime(); const int64_t timeout = std::max(1LL, (_minimumRetryTime - now + 500) / 1000); _resources.getTimerQueue().addMSFromNow(this, timeout); //LogFile::primary().sendString(TclList()<getName(); //if (_talkWithServer) // msg<<(_talkWithServer->disconnected()?"DISCONNECTED":"connected"); //else // msg<<"NULL"; //LogFile::primary().sendString(msg, _clientSocket); if (_talkWithServer) { _resources.clearPollListener(_talkWithServer->getHandle()); _resources.getPollSet().removeForRead(_talkWithServer->getHandle()); _resources.getPollSet().removeForWrite(_talkWithServer->getHandle()); _talkWithServer->disconnect(); delete _talkWithServer; } //msg.clear(); //msg<loginWithPassword()) { // This is aimed at chaining from one proxy to the next. This is similar // to the old ax_alert_server login, but simpler. The ax_alert_server // required a second command to finish the login process. (If you don't // send the second command the connection will appear frozen for no // obvious reason.) message["command"] = "login"; message["username"] = userInfo.username; message["password"] = userInfo.password; message["vendor_id"] = "chained micro_proxy"; } else { // This is new the preferred method for login. Let the proxy do the work // for you. All other servers trust the proxy. message["command"] = "proxy_login"; if (userInfo.userId) message["user_id"] = ntoa(userInfo.userId); if (userInfo.status == sLimited) message["status"] = "limited"; else if (userInfo.status == sFull) message["status"]= "full"; } sendMessage(message); } void TcpServerConnection::sendMessage(PropertyList const &message) { //TclList items; //for (auto it = message.begin(); it != message.end(); it++) // items<<(TclList()<first<second); //TclList msg; //msg<getName(); //if (_talkWithServer) // msg<<(_talkWithServer->disconnected()?"DISCONNECTED":"connected"); //else // msg<<"NULL"; //msg<disconnected()) _talkWithServer->sendMessage(message); updatePollSet(); } void TcpServerConnection::sendMessage(Command const *command, ExternalRequest *request) { _history.add(command, request->getResponseMessageId()); sendMessage(request->getProperties()); } void TcpServerConnection::onResponse(std::string bytes, TalkWithServer64::MessageId messageId) { ThreadMonitor::find().increment(s_forward_response); addToOutputQueue(_clientSocket, bytes, ExternalRequest::MessageId(messageId)); } // Don't request a response. We could. That might make the code more // robust. But it would also make this more complicated. // // Our goal here is to tell the server receiving this message that we // are still alive. If we ever really wanted to check if the remote // server was alive, we'd probably do things differently. One option // would be to set up a special version of ping that doesn't affect the // server's dead man timer, so we could call it on all servers. Another // approach would be to change the tcp_keepalive_time socket option, // so the operating system would automatically send the pings at // appropriate times. const PropertyList TcpServerConnection::PING_COMMAND = { {"command", "ping"} }; void TcpServerConnection::onTimeOut() { if ((!_talkWithServer)||_talkWithServer->disconnected()) { // We got here because of _talkWithServer->disconnected(). // If it happens in the constructor we don't want to report the problem // immediately. That would cause various problems. Instead we ask the // timer to call us back soon. // // One problem is that the callback would happen before the listener // was ready. The object that owns this object, will create this object // and store it in table. The callback will look for this object in the // table. If we're still in the constructor, then the owner couldn't // have stored us anywhere. // // Another problem is avoiding wasted resources. In case of an error we // don't want to keep retrying over and over. That would use a lot of // CPU and a lot of network. Multiple programs would be involved. If // there's a connection problem, start by sleeping for one second, then // try again. //TclList msg; //msg<onServerClosed(_server); return; } if (_server->sendPings()) { ThreadMonitor::find().increment(s_send_ping); PropertyList message; message["command"] = "ping"; sendMessage(message); _resources.getTimerQueue().addMSFromNow(this, PING_TIME); } } void TcpServerConnection::onPollReturn() { if (!_talkWithServer) return; ThreadMonitor::SetState state(s_onPollReturn_wakeUp); _talkWithServer->wakeUp(); state.setState(s_onPollReturn_doResponses); _talkWithServer->doResponses(); state.setState(s_onPollReturn_updatePollSet); updatePollSet(); if (_talkWithServer->disconnected()) { // We're treating all errors the same. needConnection() will decide // what to do. It might try another server immediately. Or it might // schedule this ServerConnection to be destroyed so the client has // to deal with it. //LogFile::primary().sendString(TclList()<getHandle(); IPollSet &pollSet = _resources.getPollSet(); if (_talkWithServer->wantsRead()) pollSet.addForRead(handle); else pollSet.removeForRead(handle); if (_talkWithServer->wantsWrite()) pollSet.addForWrite(handle); else pollSet.removeForWrite(handle); //TclList msg; //msg<wantsRead() // <<"wants write"<<_talkWithServer->wantsWrite(); //LogFile::primary().sendString(msg, _clientSocket); } void TcpServerConnection::debugDump(XmlNode &parent) const { XmlNode &result = parent[-1]; result.name = "SERVER_CONNECTION"; result.properties["debug_name"] = _debugName; if (_talkWithServer) { XmlNode &talkWithServer = result["TALK_WITH_SERVER"]; // Disconnected means that the TalkWithServer knows that the connection // is closed. We've had some problems detecting this. So far it appears // that as soon as TalkWithServer knows this, the main program will // handle the case correctly. But for some reason this is sometimes // false when the log for the down steam server says it disconnected. talkWithServer.properties["disconnected"] = _talkWithServer->disconnected()?"true":"false"; talkWithServer.properties["write_succeeded"] = _talkWithServer->writeSucceeded()?"true":"false"; talkWithServer.properties["name"] = _talkWithServer->getName(); // watchingForRead() should match wantsRead() at any given time. Either // watchingForRead() or watchingForWrite() should be sufficient to find // a socket closed by the downstream server. talkWithServer.properties["polling_for_read"] = _resources.getPollSet().watchingForRead(_talkWithServer->getHandle()) ?"true":"false"; // watchingForWrite() should match wantsWrite() at any time. talkWithServer.properties["polling_for_write"] = _resources.getPollSet().watchingForWrite(_talkWithServer->getHandle()) ?"true":"false"; // wantsRead(). This should probably always be true. TalkWithServer // will respond with false if the connection is closed. But this // program will remove the TalkWithServer object in that case. talkWithServer.properties["wants_read"] = _talkWithServer->wantsRead()?"true":"false"; // wantsWrite(). This should probably always be false. This will be // true if you write a lot to the downstream server and the OS's buffer // gets full. Our client usually tries not to flood the network. // Furthermore, since the client is far away on the internet, but the // downstream server is on the same LAN as the proxy, it seems unlikely // that the client could overwhelm the downstream server. talkWithServer.properties["wants_write"] = _talkWithServer->wantsWrite()?"true":"false"; } _history.debugDump(result); result.properties["server_name"] = _server->getName(); result.properties["real_servers_remaining"] = ntoa(_addresses.size()); } ///////////////////////////////////////////////////////////////////// // ClientConnection ///////////////////////////////////////////////////////////////////// ClientConnection::ClientConnection(SocketInfo *socket, Resources &resources) : _clientSocket(socket), _resources(resources), _userInfo(userInfoGetInfo(socket)) { } void ClientConnection::onServerClosed(Server *server) { ThreadMonitor::SetState setState(s_onServerClosed); setState.increment(s_onServerClosed); auto it = _serverConnections.find(server); if (it != _serverConnections.end()) { delete it->second; _serverConnections.erase(it); } } void ClientConnection::forward(ExternalRequest *request) { std::string const &clientCommand = request->getCommand(); if (const Command *command = _resources.getCommand(clientCommand)) { if (Server *const server = command->getServer(this)) { IServerConnection *serverConnection = getServerConnection(server); request->setCommand(command->getServerName()); if (command->replaceSubcommand()) { request->getProperties()["subcommand"] = command->getSubcommand(); } serverConnection->sendMessage(command, request); } } } IServerConnection *ClientConnection::getServerConnection(Server *server) { IServerConnection *&result = _serverConnections[server]; if (!result) { if (server->getAddresses()[0].type == Server::Address::KAFKA) { result = KafkaServerConnection::getInstance(_resources); static_cast(result)->addClient(_clientSocket, server); } else { result = new TcpServerConnection(_clientSocket, server, _resources, this); //TclList msg; //msg<getName(); //LogFile::primary().sendString(msg, _clientSocket); } } return result; } ClientConnection::~ClientConnection() { for (auto it = _serverConnections.begin(); it != _serverConnections.end(); it++) delete it->second; DeleteSocketThread::deleteSocket(_clientSocket); } void ClientConnection::debugDump(XmlNode &parent) const { { XmlNode &userInfo = parent["USER_INFO"]; userInfo.properties["user_id"] = ntoa(_userInfo.userId); std::string status; switch (_userInfo.status) { case sNone: status = "sNone"; break; case sLimited: status = "sLimited"; break; case sFull: status = "sFull"; break; default: status = ntoa(_userInfo.status); break; } userInfo.properties["status"] = status; userInfo.properties["username"] = _userInfo.username; } XmlNode &serverConnections = parent["SERVER_CONNECTIONS"]; for (auto it = _serverConnections.begin(); it != _serverConnections.end(); it++) { it->second->debugDump(serverConnections); } } ///////////////////////////////////////////////////////////////////// // ForwardingThread ///////////////////////////////////////////////////////////////////// void ForwardingThread::threadFunction() { ThreadMonitor &tm = ThreadMonitor::find(); _pollSet.addForRead(_incomingRequests.getWaitHandle()); while (true) { tm.setState(s_threadFunction_timer_callbacks); _timerQueue.doAllCallbacks(); //msg.clear(); //msg<onPollReturn(); } } // Look for commands. tm.setState(s_threadFunction_commands); _incomingRequests.resetWaitHandle(); while (Request *current = _incomingRequests.getRequest()) { switch (current->callbackId) { case mtQuit: { delete current; return; } case mtForward: { ThreadMonitor::SetState setState(s_mtForward); setState.increment(s_mtForward); ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); getClientConnection(socket)->forward(request); break; } case mtDebugDump: { ThreadMonitor::SetState setState(s_mtDebugDump); setState.increment(s_mtDebugDump); ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); XmlNode reply; reply.properties["socket"] = ntoa(SocketInfo::getSerialNumber(socket)); reply.properties["host_name"] = getShortHostName(); if (ClientConnection *clientConnection = getPropertyDefault(_clientConnections, socket)) clientConnection->debugDump(reply); addToOutputQueue(socket, reply.asString(), request->getResponseMessageId()); break; } case DeleteSocketThread::callbackId: { ThreadMonitor::SetState setState(s_delete_socket); setState.increment(s_delete_socket); SocketInfo *socket = current->getSocketInfo(); auto it = _clientConnections.find(socket); if (it != _clientConnections.end()) { delete it->second; _clientConnections.erase(it); } break; } } delete current; } } } ForwardingThread::ForwardingThread() : ThreadClass("ForwardingThread"), _incomingRequests(getName()) { CommandDispatcher &cd = *CommandDispatcher::getInstance(); cd.listenForCommand("micro_proxy_debug_dump", &_incomingRequests, mtDebugDump, /* lock */ false, /* immuneToLock */ true); initializeServers(); startThread(); } ForwardingThread::~ForwardingThread() { Request *request = new Request(NULL); request->callbackId = mtQuit; _incomingRequests.newRequest(request); waitForThread(); } void ForwardingThread::initializeServers() { CommandDispatcher &cd = *CommandDispatcher::getInstance(); const std::string mainFileName = getConfigItem("server_config", "init.xml"); const std::string patchFileName = getConfigItem("server_patches"); Server::Factory factory; factory.loadSingleFile(mainFileName); sendToLogFile(TclList() << FLF << "after factory.loadSingleFile(mainFileName)" << mainFileName << factory.debugDump()); if (!factory.validateMain()) { sendToLogFile(TclList() << FLF << "Error loading config file." << mainFileName << factory.getMessages()); scheduleSyncrhonizedShutdown(); return; } if (!patchFileName.empty()) { Server::Factory patchFactory; patchFactory.loadSingleFile(patchFileName); sendToLogFile(TclList() << FLF << "after patchFactory.loadSingleFile(patchFileName)" << patchFileName << patchFactory.debugDump()); if (!patchFactory.validatePatch()) { sendToLogFile(TclList() << FLF << "Error loading config file." << patchFileName << patchFactory.getMessages()); scheduleSyncrhonizedShutdown(); return; } factory = Server::Factory(factory, patchFactory); sendToLogFile(TclList() << FLF << "after Server::Factory(factory, patchFactory)" << factory.debugDump()); if (!factory.validateMain()) { sendToLogFile(TclList() << FLF << "Error merging config files." << mainFileName << patchFileName << factory.getMessages()); scheduleSyncrhonizedShutdown(); return; } } _servers = factory.create(); for (auto server : _servers) { for (auto &kvp : server->getCommands()) { const Command &command = kvp.second; const std::string &clientName = command.getClientName(); cd.listenForCommand(clientName, &_incomingRequests, mtForward); // Remember, the command is owned by the Server object. // If you destroy the Server object, the pointer to the command // will no longer be valid. _commandsByName[clientName] = &command; } } } ClientConnection *ForwardingThread::getClientConnection(SocketInfo *socket) { ClientConnection *&result = _clientConnections[socket]; if (!result) result = new ClientConnection(socket, *this); return result; } Command const *ForwardingThread::getCommand(std::string const &clientCommand) { // This is allowed to return NULL, but that seems unlikely. We used the // same list of commands to populate _commandsByName and to initialize // the command dispatcher. return getPropertyDefault(_commandsByName, clientCommand); } void ForwardingThread::setPollListener(int handle, PollReturn *listener) { if (listener) _handleToAction[handle] = listener; else _handleToAction.erase(handle); } class KafkaServerConnection : public IServerConnection { private: static KafkaServerConnection* _instance; std::map _clients; Resources &_resources; //KafkaProducer* _producer; //KafkaConsumer* _consumer; // Private constructor for singleton KafkaServerConnection(Resources &resources) : _resources(resources) { // Initialize Kafka connections } public: static KafkaServerConnection* getInstance(Resources &resources) { if (!_instance) { _instance = new KafkaServerConnection(resources); } return _instance; } void addClient(SocketInfo* clientSocket, Server* server) { _clients.emplace(clientSocket, ServerHistory(server)); } virtual void debugDump(XmlNode &parent) const override { } virtual void sendMessage(Command const *command, ExternalRequest *request) override { auto& history = _clients[request->getSocketInfo()]; history.add(command, request->getResponseMessageId()); // Send to Kafka } }; KafkaServerConnection* KafkaServerConnection::_instance = nullptr; } void initForwardingThread() { new ForwardingThreadNS::ForwardingThread(); } std::string testForwardingConfigFile(std::string const &fileName) { ForwardingThreadNS::Server::Factory factory; factory.loadSingleFile(fileName); if (factory.validateMain()) return ""; else if (factory.getMessages().empty()) // This branch shouldn't be required, but we want to be certain. Returning // "" means no error. return "Unable to load config file."; else return factory.getMessages(); }