#include "../../shared/CommandDispatcher.h" #include "../../shared/MiscSupport.h" #include "../../shared/ReplyToClient.h" #include "../../shared/LogFile.h" #include "UserInfo.h" #include "ReportAlertsThread.h" template < class Key, class Value > void eraseItem(std::map< Key, std::set< Value > > &map, Key key, Value value) { std::set< Value > &s = map[key]; s.erase(value); if (s.empty()) map.erase(key); }; ReportAlertsThread *ReportAlertsThread::instance = NULL; void ReportAlertsThread::deleteAll(SocketInfo *socket) { //dump("before deletAll()"); CategorySet &categories = _bySocket[socket]; for (CategorySet::iterator it = categories.begin(); it != categories.end(); it++) eraseItem(_byCategory, *it, socket); _bySocket.erase(socket); //dump("after deletAll()"); } // Note that every string is a prefix of itself. static bool isPrefix(std::string const &prefix, std::string const &value) { if (prefix.size() > value.size()) return false; std::string::size_type count = prefix.size(); char const *pp = prefix.data(); char const *vp = value.data(); while (count) { if ((*pp) != (*vp)) return false; pp++; vp++; count--; } return true; } bool ReportAlertsThread::isPermissioned(SocketInfo *socket, std::string const &category) { if (_permissionsBySocket.find(socket) == _permissionsBySocket.end()) { // Not in cache. Load it now. CategorySet &forUser = _permissionsBySocket[socket]; const std::vector< std::string > categories = explode(";", UserInfoThread::getInstance().getField(socket, "categories")); for (std::vector< std::string >::const_iterator it = categories.begin(); it != categories.end(); it++) // By default an empty field would split up into a single category // which is the empty string. The empty string is a prefix for // everything, so you could see all alerts. If you really intend to // give someone permissions for everything, specify "***all***". if (*it == "***all***") forUser.insert(""); else if (!it->empty()) forUser.insert(*it); } CategorySet &forUser = _permissionsBySocket[socket]; if (forUser.empty()) return false; // We want the largest value which is not greater than category. There is no // direct way to do that. So we start by asking for the smallest value which // is greater than category. CategorySet::iterator possible = forUser.upper_bound(category); if (possible == forUser.end()) // If there isn't any value greater than category, then we look at the // greatest value in the set. Note that we've already dealt with the case // of an empty set. return isPrefix(*forUser.rbegin(), category); else { // Otherwise, we look at the value immediately preceding the greater // value. --possible; if (possible == forUser.end()) return false; else return isPrefix(*possible, category); } // If there is no value which is not greater than category, then there is // no prefix of category in the set. If there is at least one prefix of // category in the set, then the largest value which is not greater than // category will be a prefix of category. } void ReportAlertsThread::dump(std::string const &message) { for (BySocket::const_iterator sIt = _bySocket.begin(); sIt != _bySocket.end(); sIt++) { CategorySet const &set = sIt->second; for (CategorySet::const_iterator cIt = set.begin(); cIt != set.end(); cIt++) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"_bySocket"<first); } } for (ByCategory::const_iterator cIt = _byCategory.begin(); cIt != _byCategory.end(); cIt++) { SocketSet const &set = cIt->second; for (SocketSet::const_iterator sIt = set.begin(); sIt != set.end(); sIt++) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"_byCategory"<first; LogFile::primary().sendString(msg, *sIt); } } for (PropertyList::const_iterator it = _cache.begin(); it != _cache.end(); it++) { TclList msg; msg<first<second; LogFile::primary().sendString(msg); } } void ReportAlertsThread::threadFunction() { const ExternalRequest::MessageId defaultMessageId(1); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtAddCategory: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); SocketInfo *const socket = request->getSocketInfo(); std::string const &category = request->getProperty("category"); if (isPermissioned(socket, category)) { _bySocket[socket].insert(category); _byCategory[category].insert(socket); addToOutputQueue(socket, "OK", request->getResponseMessageId()); const std::string response = getPropertyDefault(_cache, category); if (!response.empty()) addToOutputQueue(socket, response, getProperty(_responseId, socket, defaultMessageId)); } else addToOutputQueue(socket, "FAIL", request->getResponseMessageId()); break; } case mtDeleteCategory: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); SocketInfo *const socket = request->getSocketInfo(); std::string const &category = request->getProperty("category"); //dump("before deleting: " + category); eraseItem(_bySocket, socket, category); eraseItem(_byCategory, category, socket); //dump("after deleting: " + category); break; } case mtDeleteAll: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); SocketInfo *const socket = request->getSocketInfo(); deleteAll(socket); break; } case mtReportAlert: { ReportAlert *request = dynamic_cast< ReportAlert * >(current); SocketSet &sockets = _byCategory[request->category]; const std::string response = request->category + "\r\n" + request->info; for (SocketSet::iterator it = sockets.begin(); it != sockets.end(); it++) addToOutputQueue(*it, response, getProperty(_responseId, *it, defaultMessageId)); if (_cacheable.count(request->category)) _cache[request->category] = response; break; } case mtSetCacheable: { ReportAlert *request = dynamic_cast< ReportAlert * >(current); _cacheable.insert(request->category); break; } case mtAddAlertCategory: { ReportAlert *request = dynamic_cast< ReportAlert * >(current); _allAlertCategories[request->category] = request->info; break; } case mtResponseId: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); SocketInfo *const socket = request->getSocketInfo(); _responseId[socket] = request->getResponseMessageId(); break; } case mtListCategories: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); std::string result; for (PropertyList::const_iterator it = _allAlertCategories.begin(); it != _allAlertCategories.end(); it++) if (isPermissioned(socket, it->first)) { result += it->first; result += "\r\n"; result += it->second; result +="\r\n"; } addToOutputQueue(socket, result, request->getResponseMessageId()); break; } case mtFlushPermissions: { // This only flushes the cache. This has limited scope. It // does not immediately prevent anyone from seeing alerts, only // from subscribing to another category. To update // immediately, disconnect that user. In that case you don't // need to call this becuase we cache things on a per socket // basis, not a per user basis. ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); if (UserInfoThread::getInstance().hasAdminPermission(socket)) { _permissionsBySocket.clear(); addToOutputQueue(socket, "OK", request->getResponseMessageId()); } else addToOutputQueue(socket, "FAIL", request->getResponseMessageId()); break; } case mtDump: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); if (UserInfoThread::getInstance().hasAdminPermission(socket)) { dump(request->getProperty("message", "mtDump")); addToOutputQueue(socket, "OK", request->getResponseMessageId()); } else addToOutputQueue(socket, "FAIL", request->getResponseMessageId()); break; } case DeleteSocketThread::callbackId: { SocketInfo *const socket = current->getSocketInfo(); deleteAll(socket); _responseId.erase(socket); _permissionsBySocket.erase(socket); break; } case mtQuit: { delete current; return; } } delete current; } _incoming.waitForRequest(); } } ReportAlertsThread::ReportAlertsThread() : ThreadClass("ReportAlertsThread"), _incoming(getName()) { instance = this; startThread(); CommandDispatcher &cd = *CommandDispatcher::getInstance(); cd.listenForCommand("add_alert_category", &_incoming, mtAddCategory); cd.listenForCommand("delete_alert_category", &_incoming, mtDeleteCategory); cd.listenForCommand("delete_all_categories", &_incoming, mtDeleteAll); cd.listenForCommand("listen_for_alerts", &_incoming, mtResponseId); cd.listenForCommand("list_all_categories", &_incoming, mtListCategories); cd.listenForCommand("flush_category_permissions", &_incoming, mtFlushPermissions); cd.listenForCommand("report_alerts_dump", &_incoming, mtDump); } ReportAlertsThread::~ReportAlertsThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void ReportAlertsThread::reportAlert(std::string const &category, std::string const &info) { ReportAlert *r = new ReportAlert; r->callbackId = mtReportAlert; r->category = category; r->info = info; _incoming.newRequest(r); TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__<callbackId = mtAddAlertCategory; r->category = category; r->info = description; _incoming.newRequest(r); } void ReportAlertsThread::setCacheable(std::string const &category) { ReportAlert *r = new ReportAlert; r->callbackId = mtSetCacheable; r->category = category; _incoming.newRequest(r); LogFile::primary().sendString(TclList()<