#include #include "../shared/CommandDispatcher.h" #include "../shared/GlobalConfigFile.h" #include "../shared/ReplyToClient.h" #include "../shared/LogFile.h" #include "../shared/XmlSupport.h" #include "../shared/MultiCast.h" #include "UserInfo.h" ///////////////////////////////////////////////////////////////////// // UserInfoThread::LoginInfo ///////////////////////////////////////////////////////////////////// UserInfoThread::LoginInfo::LoginInfo(ExternalRequest *request) { socket = request->getSocketInfo(); username = request->getProperty("username"); password = request->getProperty("password"); sessionId = request->getProperty("session_id"); responseMessageId = request->getResponseMessageId(); } ///////////////////////////////////////////////////////////////////// // UserInfoThread ///////////////////////////////////////////////////////////////////// // We have two separate mutexes. That allows us to use a fast mutex. // Otherwise we'd need a recursive mutex. // // There is only one instance of the object. The first person who needs the // object will create it and all others will reuse it. A mutex prevents two // separate people from both creating the object. We might be able to avoid // this mutex, but it was a lot simpler just to add it. // // The "_data" is protected by a mutex. It is only accessed through our // methods, so we can take care of the mutex, and we know all access will be // quick and we know there will be no recursive calls to the mutex. // // To read from the _data, you must be in this thread, OR you must be holding // the mutex. To modify the data, you must be in this thread AND holding the // mutex. The sames goes for _socketToUser. // Note: _mutex uses a convention that we successfully use in a lot of places. // On the other hand, createMutex is not a good idea and should not be copied. // Instead, create a function called find(). Create a static variable in that // function to store the instance. g++ will automatically add the code to make // sure that static variable is only created once, and that code will be thread // safe. static pthread_mutex_t createMutex = PTHREAD_MUTEX_INITIALIZER; UserInfoThread *UserInfoThread::instance; UserInfoThread::UserInfoThread() : ThreadClass("UserInfoThread"), _incoming(getName()), _database(false, getName()) { int error = pthread_mutex_init(&_mutex, NULL); assert(!error); CommandDispatcher &cd = *CommandDispatcher::getInstance(); cd.listenForCommand("login", &_incoming, mtLogin, true); cd.listenForCommand("proxy_login", &_incoming, mtProxyLogin, true); cd.listenForCommand("reload_user_info", &_incoming, mtReload); cd.listenForCommand("show_all_users", &_incoming, mtShowAllUsers); cd.listenForCommand("disconnect_user", &_incoming, mtDisconnectUser); cd.listenForCommand("disconnect_all", &_incoming, mtDisconnectAll); // Load the CSV file now so no one will be able to access it from another // thread before we load it. loadFile(); startThread(); } UserInfoThread::~UserInfoThread() { // currently no way to unregister from the command handler! assert(false); } void UserInfoThread::loginRequest(ExternalRequest *request) { LoginInfo loginInfo(request); // SMB login was here. tryLogin(loginInfo); } // How the session id works: This is simpler than TI Pro. For one thing, only // one server at a time will be serving the users. For another, we're not // worried about people cheating. We don't queue up alerts, and we can be sure // that only one client at a time is receiving alerts. So at worst, one client // would get half the alerts and the other client would get the other half. // // The session does have an important job. If a client gets temporarily // disconnected (network down), and a second client logs in at that time, then // the first client tries to reconnect, the session id will prevent the first // client from reconnecting. TI Pro has something similar. TI Pro's version // should never fail. In certain unlikely cases this algorithm could fail. // But this should still be pretty good. // // When the client starts fresh, it gives the server the empty string for the // session id. The server generates a session id and sends it back to the // client. If the client is unexpectedly disconnected (i.e. a network error) // the client will reconnect and give the session id back to the server. The // blank session id will always succeed. Otherwise the server checks the // session id to make sure it's current. // // The session id has two parts: the client part and the server part. The // server part is initialized when the server first starts. It could be the // time in nanoseconds when the server starts. We don't save session // information when the server restarts, so if the server parts don't match, // we don't have a lot of details about the session. We might also forget all // info when we reread the user info from disk. In that case, we'd update the // server part at that time. // // The client part will probably be a simple counter that starts at 0 when // the server part resets. We will save this in our records. We will keep // this until the server restarts, or similar. // // To verify the session id: // o If the client does not provide a session id, we accept this client, // and we generate a new session id. We give it to the client and we // store it for later. // o If the session id matches our records, we accept it. It doesn't matter // if the server part is old or new. // o If we don't have any record of a connection for this client, we accept // the connection and store the session id. We assume that this came from // before our last reset. We could verify that, but there would be no // point. We could try to generate an exception, or print something to the // log. The bug might be in the client. // o Otherwise we refuse the connection. We tell the client what the problem // was. // // Note that we only change the session id if the initial one was blank. This // removes the need for the client to verify that he got the new session id. void UserInfoThread::tryLogin(LoginInfo const &loginInfo) { if (loginInfo.username.empty() || loginInfo.password.empty() || (_data.get("password", loginInfo.username) != loginInfo.password)) { addToOutputQueue(loginInfo.socket, "STOP\r\nInvalid username or password.\r\n", loginInfo.responseMessageId); closeWhenOutputQueueIsEmpty(loginInfo.socket); return; } const std::string expectedSessionId = _data.get("session id", loginInfo.username); if (!(loginInfo.sessionId.empty() || expectedSessionId.empty() || (loginInfo.sessionId == expectedSessionId))) { addToOutputQueue(loginInfo.socket, "STOP\r\nNew session.\r\n", loginInfo.responseMessageId); closeWhenOutputQueueIsEmpty(loginInfo.socket); return; } if (SocketInfo *oldSocket = getPropertyDefault(_userToSocket, loginInfo.username)) { addToOutputQueue (oldSocket, "STOP\r\nNew session.\r\n", ExternalRequest::MessageId(_data.get("status channel", loginInfo.username))); closeWhenOutputQueueIsEmpty(oldSocket); pthread_mutex_lock(&_mutex); _userToSocket.erase(loginInfo.username); _socketToUser.erase(oldSocket); pthread_mutex_unlock(&_mutex); } TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__<callbackId = it->callbackId; it->listener->newRequest(login); } // I added these two options for debugging a problem where some chart // requests were being ignored. It was always the first chart request, and // only sometimes, so it sounded like a race condition related to logins. // I added (A) because this was hard to reproduce on a test server, but // happened regularly on a production server. Once I saw the bug on this // test server, using this trick, but I couldn't repeat it. (B) just lets // us trace the timing of the various operations. I never saw the bug when // (B) was in place. Both the client and the log files showed everything // working as expected with (B). //A LogFile::primary().sendString("About to simulate a busy server...", socket); //A sleep(1); //A LogFile::primary().sendString("...done simulating a busy server.", socket); //B LogFile::primary().sendString(TclList()<unlock(socket); } void UserInfoThread::proxyLoginRequest(ExternalRequest *request) { SocketInfo *socket = request->getSocketInfo(); static uint64_t demoCount = 0 ; std::string username = "proxy:" + request->getProperty("user_id"); if (username == "proxy:") { username = "proxy:demo:" + std::to_string(demoCount++); } TclList msg; msg<callbackId) { case mtLogin: { ExternalRequest *request = dynamic_cast(current); loginRequest(request); break; } case mtProxyLogin: { ExternalRequest *request = dynamic_cast(current); proxyLoginRequest(request); break; } case mtReload: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); if (hasAdminPermission(socket)) { loadFile(request->getProperty("file_name")); addToOutputQueue(socket, "OK", request->getResponseMessageId()); } else addToOutputQueue(socket, "FAIL", request->getResponseMessageId()); break; } case mtShowAllUsers: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); std::string response; if (!hasAdminPermission(socket)) response = "FAIL\r\n"; else { response = "OK\r\n"; for (std::map< std::string, SocketInfo * >::const_iterator it = _userToSocket.begin(); it != _userToSocket.end(); it++) { response += it->first; response += "\r\n"; } } addToOutputQueue(socket, response, request->getResponseMessageId()); break; } case mtDisconnectUser: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); const std::string username = request->getProperty("username"); SocketInfo *const toDisconnect = getPropertyDefault(_userToSocket, username); if (socket == toDisconnect) { // It's me! Try to display the message before // disconnectinglogging to help with debugging. addToOutputQueue(socket, "BYE", request->getResponseMessageId()); closeWhenOutputQueueIsEmpty(socket); } else if (!(toDisconnect && hasAdminPermission(socket))) { // can't find user or no permissions. addToOutputQueue(socket, "FAIL", request->getResponseMessageId()); } else { // Disconnect immediately. Do not try to flush output // queue. Presumably there's some reason we want to // disconnect him, so we do it immediately. DeleteSocketThread::deleteSocket(toDisconnect); addToOutputQueue(socket, "OK", request->getResponseMessageId()); } break; } case mtDisconnectAll: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); if (!hasAdminPermission(socket)) { // can't find user or no permissions. addToOutputQueue(socket, "FAIL", request->getResponseMessageId()); } else { for (std::map< std::string, SocketInfo * >::iterator it = _userToSocket.begin(); it != _userToSocket.end(); it++) DeleteSocketThread::deleteSocket(it->second); } } case mtAddLoginListener: { AddLoginListener *request = dynamic_cast< AddLoginListener * >(current); _loginListeners.push_back(request->loginListener); break; } case DeleteSocketThread::callbackId: { std::string const * const username = getProperty(_socketToUser, current->getSocketInfo()); if (username) { pthread_mutex_lock(&_mutex); _userToSocket.erase(*username); _socketToUser.erase(current->getSocketInfo()); pthread_mutex_unlock(&_mutex); } break; } case mtMultiCast: { MultiCast::PURequest *request = dynamic_cast< MultiCast::PURequest * >(current); const std::string originalUserId = request->getUserId(); const std::string internalUserId = "proxy:" + originalUserId; const bool loggedIn = _userToSocket.count(internalUserId); if (loggedIn) request->respond(); break; } case mtQuit: { delete current; return; } } delete current; } _incoming.waitForRequest(); } } std::string UserInfoThread::getUsername(SocketInfo *socket) { pthread_mutex_lock(&_mutex); std::string result = getPropertyDefault(_socketToUser, socket); pthread_mutex_unlock(&_mutex); return result; } // The username is a the row heading. If the user is not logged in, then there // is no data. Copy the entire row from the csv file to info. Start by // clearing info. Do not copy any empty strings. void UserInfoThread::getInfo(SocketInfo *socket, PropertyList &info) { info.clear(); pthread_mutex_lock(&_mutex); std::string const * const username = getProperty(_socketToUser, socket); if (username) for (TwoDArray::StringList::const_iterator it = _data.getColHeaders().begin(); it != _data.getColHeaders().end(); it++) { std::string value = _data.get(*it, *username); if (!value.empty()) info[*it] = value; } pthread_mutex_unlock(&_mutex); } // The username is the row heading. name is the column heading. If this // socket is not logged in, or there are any other problems, return the empty // string. std::string UserInfoThread::getField(SocketInfo *socket, std::string const &name) { std::string result; pthread_mutex_lock(&_mutex); std::string const * const username = getProperty(_socketToUser, socket); if (username) result = _data.get(name, *username); pthread_mutex_unlock(&_mutex); return result; } bool UserInfoThread::hasAdminPermission(SocketInfo *socket) { // For simplicity we have just one field for admin functions and sending raw // TCL code. return getField(socket, "tcl") == "1"; } bool UserInfoThread::comesFromProxy(SocketInfo *socket) { return getField(socket, "username").empty(); } bool UserInfoThread::loggedIn(SocketInfo *socket) { pthread_mutex_lock(&_mutex); bool result = getProperty(_socketToUser, socket); pthread_mutex_unlock(&_mutex); return result; } void UserInfoThread::loadFile(std::string fileName) { if (fileName.empty()) fileName = getConfigItem("user_info_file", "user_info.csv"); const std::vector< std::string > pieces = explode(":", fileName); if ((pieces.size() == 2) && (pieces[0] == "table")) { const std::string table = (pieces[1].empty()?"tiq_user_info":pieces[1]); MysqlResultRef result = _database.tryQueryUntilSuccess("SELECT col, row, value from " + table); pthread_mutex_lock(&_mutex); _data.clear(); while (result->rowIsValid()) { _data.add(result->getStringField(0), result->getStringField(1), result->getStringField(2)); result->nextRow(); } pthread_mutex_unlock(&_mutex); } else { pthread_mutex_lock(&_mutex); _data.loadFromCSV(fileName); pthread_mutex_unlock(&_mutex); } resetSessions(); } void UserInfoThread::resetSessions() { _resetCounter = ntoa(TimeVal(true).asMicroseconds()); _sessionCounter = 0; } std::string UserInfoThread::getSessionId() { _sessionCounter++; return _resetCounter + ':' + ntoa(_sessionCounter); } UserInfoThread &UserInfoThread::getInstance() { if (!instance) { pthread_mutex_lock(&createMutex); if (!instance) instance = new UserInfoThread; pthread_mutex_unlock(&createMutex); } return *instance; } void UserInfoThread::addLoginListener(RequestListener *listener, int callbackId) { _incoming.newRequest(new AddLoginListener({listener, callbackId})); }