#include #include #include #include #include "../shared/ThreadClass.h" #include "../shared/Messages.h" #include "../shared/CommandDispatcher.h" #include "../shared/ReplyToClient.h" #include "../shared/FixedMalloc.h" #include "../shared/GlobalConfigFile.h" #include "../shared/NewConnections.h" #include "../shared/LogFile.h" // This program is primarily intended to protect the database from // overuse and to improving the fairness of queuing. // // MySql provides a fixed number of connections. By default there // are 100, but you can change this number. Sometimes you want // multiple queries going at once. Some types of long queries can // be time-sliced and work together. Some can even run while others // are blocked for a variety of possible reasons, so things go // faster with some multi-tasking. // // The problem comes when a bunch of requests come at once. This // can happen easily because each apache thread can create a request. // There is no authority keeping these from bunching up. And, if // some query is slow, and holding up the other queries, the other // queries will build up quickly and overload the system. At this // point new requests will be rejected. This will appear the same // as if the database was shut down. // // There are two issues. One is that all the requests are treated // equally. So if 100 people are trying to autorefresh, they will // block out any users trying to view a normal web page. That user // will see the database is down page. If there are 99 web // applications trying to refresh, and 99 users on the C++ server // all sharing a single database connection, the 99 web users will // have an advantage and will be processed more quickly. // // The second is that there is no real queuing for requests. MySql // will either accept a request or deny it. There is no sense of // allowing a request to wait for a calm period. It is easy to // believe that the requests will bunch up, and you'd like an // organized way to wait for the congestion to end. Typically the // requests are very fast, so it's just random when they will bunch // up but they will go away quickly. When we overflow the 100 user // count, that's usually because something was running slower than we // want, but still not typically that slow in the user's sense of // time. So a queue would be more appropriate than refusing a // connection. // // MySql offers some ability to deal with the first issue. Some // connections can be reserved for privliged users. And the user // table has a max_connections property for each user. We are not // taking advantage of this. // // Mysql offers no support for the second issue. The best we could // do is reject some requests of one type to allow more requests of // the other type to go through. Increasing the maximum number of // connections offers a poor substitute for a real queue. This // helps at first, but it doesn't scale well at all. And there is // the fairness issue raised previously. A FIFO provides fairness. // // This program solves both issues. When a program wants to access // the database, it should come to us first, and request a semaphore. // The program will give us the name of a semaphore. We use // semaphores, not critical sections or mutexes, because that allows // us to N users at once, where N is specified at the beginning. If // the semaphore is already at capacity, then the user is thrown // into a FIFO. In either case, the user will be notified when he // acquires the semaphore. When the user is done with the database, // he should release the semaphore so other people can use it. // // Users connect to us through a TCP/IP socket. This is consistent // with the way they connect to the database. This allows multiple // users on multiple computers to share a semaphore. Users wait on // a semaphore using a simple URL-like syntax that we use in // several places. Users signal the semaphore when they are done // by closing the TCP/IP socket. This takes care of any program // which exits for any reason. This takes care of a PHP script which // ends by any means. // // We provide a timeout on the queue. This is specified from the // command line. Each semaphore can have its own timeout. A user // will wait until the semaphore is available or until the timeout, // whichever comes first. On success we send "OK\n". On timeout // or other error we break the connection without sending OK. There // is no way to disable the timeout, but you could make it a very // large integer. // // Between the initial connection and the user's request to wait // for a semaphore we have the standard timeout implemented by the // common command interpreter library. After successfully acquiring // a semaphore, there is no timeout. You can hold it forever. // // This software can handle an almost unlimited number of semaphores // and connections. The limits are based on system memory and // settings in the O/S. In practical terms, these are unlimited, and // we can handle a lot more connections than we would ever expect // workers. The rough estimate is a peak of 255 workers per apache // server. There could be a few more from other sources, but this // is a small number by comparison to even one apache server. The // assumption is that a worker will never request more than one // semaphore at a time. // // The command line is processed by our standard GlobalConfigFile // library. timeout_X=10 means that the semaphore named X has a // timeout of 10 seconds. count_History means that a semaphore named // History can handle up to 10 simultaneous users. default_timeout // and default_count provide the timeout and count for any other // semaphores. listen_port is the TCP/IP port where we listen for // connections. You could create multiple instances of this program // on different ports, or you could just get more creative with the // semaphore names. verbose=1 sends a lot more debug information // to the log. // // This program was created to handle problems related to the // database. However, there is nothing specific to the database // in the implementation. This could be used to protect other // shared resources. For example, you might want to limit the // number of unix processes to keep the system from thrashing, but // you want to allow more than one process at a time. ///////////////////////////////////////////////////////////////////// // Semaphore ///////////////////////////////////////////////////////////////////// class Semaphore : public FixedMalloc { private: const std::string _name; const unsigned int _count; const int _timeout; typedef std::list< SocketInfo * > List; List _waitingInOrder; typedef std::map< SocketInfo *, List::iterator > Map; Map _waitingBySocket; std::set< SocketInfo * > _active; bool roomForMore(); void activate(SocketInfo *socket); public: Semaphore(std::string name, unsigned int count, int timeout); void wait(SocketInfo *socket); void remove(SocketInfo *socket); bool empty(); std::string const &getName(); }; inline bool Semaphore::roomForMore() { return _active.size() < _count; } inline Semaphore::Semaphore(std::string name, unsigned int count, int timeout) : _name(name), _count(count), _timeout(timeout) { } void Semaphore::activate(SocketInfo *socket) { addToOutputQueue(socket, "OK\n"); _active.insert(socket); CommandDispatcher::getInstance()->getDeadManTimer().grantImmunity(socket); } void Semaphore::wait(SocketInfo *socket) { if (roomForMore()) { activate(socket); } else { _waitingInOrder.push_front(socket); _waitingBySocket[socket] = _waitingInOrder.begin(); CommandDispatcher::getInstance()-> getDeadManTimer().touchConnection(socket, _timeout); } } void Semaphore::remove(SocketInfo *socket) { if (_active.erase(socket)) { // This socket was holding the semaphore. if (!_waitingInOrder.empty()) { SocketInfo *next = _waitingInOrder.back(); _waitingInOrder.pop_back(); activate(next); } } else { Map::iterator it = _waitingBySocket.find(socket); if (it != _waitingBySocket.end()) { // This socket was waiting for the semaphore. _waitingInOrder.erase(it->second); _waitingBySocket.erase(it); } } } inline bool Semaphore::empty() { // No need to check the waiting queue. If there is room in here, we // immediately remove an item from the waiting queue and put it into here. return _active.empty(); } inline std::string const &Semaphore::getName() { return _name; } ///////////////////////////////////////////////////////////////////// // SemaphoreThread ///////////////////////////////////////////////////////////////////// class SemaphoreThread : private ThreadClass { private: enum { mtWait, mtQuit }; RequestQueue _incoming; std::map< std::string, Semaphore * > _allSemaphores; std::map< SocketInfo *, Semaphore * > _active; protected: void threadFunction(); public: SemaphoreThread(); ~SemaphoreThread(); }; SemaphoreThread::SemaphoreThread() : ThreadClass("SemaphoreThread"), _incoming("SemaphoreThread") { CommandDispatcher::getInstance()-> listenForCommand("wait", &_incoming, mtWait); startThread(); } SemaphoreThread::~SemaphoreThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void SemaphoreThread::threadFunction() { long unsigned int defaultCount = strtoulDefault(getConfigItem("default_count"), 5); long int defaultTimeout = strtolDefault(getConfigItem("default_timeout"), 5); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtWait: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); if (getPropertyDefault(_active, socket)) { // The client has made more than one request. This is // illegal, but there isn't much we can do about it. The // important thing is that we can't service the request a // second time without violating some of our preconditions // and invariants. So we ignore the request. } else { std::string name = request->getProperty("name"); Semaphore *&semaphore = _allSemaphores[name]; if (!semaphore) { // Create the semaphore the first time someone tries // to use it. long unsigned int count = strtoulDefault(getConfigItem("count_" + name), defaultCount); if (count) { // Set the limit to 0 to prevent anyone from // connecting. In this case we can fail // immediately. long int timeout = strtolDefault(getConfigItem("timeout_" + name), defaultTimeout); semaphore = new Semaphore(name, count, timeout); } } if (semaphore) { semaphore->wait(socket); _active[socket] = semaphore; } else { // count was 0 DeleteSocketThread::deleteSocket(socket); _allSemaphores.erase(name); } } break; } case mtQuit: { delete current; return; } case DeleteSocketThread::callbackId: { SocketInfo *socket = current->getSocketInfo(); if (Semaphore *semaphore = getPropertyDefault(_active, socket)) { semaphore->remove(socket); if (semaphore->empty()) { // Don't keep semaphores forever. Otherwise we could // have a psudo-memory leak because people could ask // for a different semaphore each time. In expected // use this isn't a problem. _allSemaphores.erase(semaphore->getName()); delete semaphore; } _active.erase(socket); } break; } } delete current; } _incoming.waitForRequest(); } } ///////////////////////////////////////////////////////////////////// // main ///////////////////////////////////////////////////////////////////// int main(int argc, char *argv[]) { if (!addConfigItemsFromCommandLine(argv + 1)) { return 1; } configItemsComplete(); int listenPort = strtolDefault(getConfigItem("listen_port", "8886"), -1); if (listenPort == -1) { std::cerr<<"Invalid listen port \"" <setShowEOF(verbose); InputListener listener(commandDispatcher->getInput(), commandDispatcher->getInputCallbackId()); NewConnections newConnections(&listener, listenPort, verbose); while (true) sleep(60); }