#include #include #include #include #include #include #include #include "../shared/Random.h" #include "../shared/CommandDispatcher.h" #include "../shared/ThreadMonitor.h" #include "../shared/DatabaseForThread.h" #include "../shared/LogFile.h" #include "../shared/ReplyToClient.h" #include "../shared/XmlSupport.h" #include "../shared/ContainerThread.h" #include "../shared/NewWorkerCluster.h" #include "../shared/GlobalConfigFile.h" #include "../ax_alert_server/CloudSupport.h" #include "../ax_alert_server/FormatTime.h" #include "../oddsmaker/UserInfo.h" #include "NfsHelper.h" #include "MiscNFS.h" /* This handles various requests that deal with the file system and the * database. It could work with any files, but we expect this to work running * on an NFS drive. The NFS requests should be fast most of the time, but they * can fail in a different way. We keep this separated so if we do run into * an NFS problem we don't take down unrelated parts of the system. * * This is initially aimed at the new cloud layout code. We're moving some * of the big data out of the MySql database and onto a normal NFS drive. The * database isn't optimized for this and sometimes it works better than others. * However, if this works well we might do similar things with other data. * That's why I call this "Misc" NFS. * * I broke this into two classes, one for the master database and one for the * slaves. We haven't discussed strict rules for what requests stay in order * for the cloud layout code. As I general rule write requests should probably * all be done in one thread, to maintain order, and read requests should all * be done in a thread pool for maximum scalability. See the symbol list code * for an alternative, where all read and write requests are guaranteed to be * processed in order. */ using namespace NfsHelper; namespace MiscNFS { // Pointer containing an error message std::unique_ptr strerror {new std::string}; // Tries to read the entire contents of the named file. // On success returns true and sets result to the contents of the file. // Otherwise returns false, sets result to "", and errorString() will contain // more information. // // TODO this was mostly copied from ../shared/XmlReader.h, with another // similar version available in ../stocktwits/MemoryLeakTest.C. This version // below seems to be pretty general purpose. It should probably be moved // to ../shared/MiscSupport.[Ch]. bool loadFromFile(std::string const &name, std::string &result) { // Inspired by // http://insanecoding.blogspot.in/2011/11/how-to-read-in-file-in-c.html result.clear(); std::ifstream in(name, std::ios::in | std::ios::binary); in.seekg(0, std::ios::end); if (in) { // If in is invalid (e.g. file not found) and we didn't check for that, // tellg() would return -1 and resize() would abort the program. (If you // call seekg(), tellg(), read(), etc., on an invalid stream, by default // that's just silently ignored.) result.resize(in.tellg()); in.seekg(0, std::ios::beg); in.read(&result[0], result.size()); } if (in) return true; else { result.clear(); return false; } } // Generally speaking there's an implied contract. If the client sends a // request and expects exactly one response, we send exactly one response. // It would be tempting to just ignore requests when we are not logged in, // but that could lead to a memory leak (or a hanging "Please Wait" window) // in the client. static void sendNullResponse(ExternalRequest const &request) { const ExternalRequest::MessageId messageId = request.getResponseMessageId(); if (messageId.present()) addToOutputQueue(request.getSocketInfo(), XmlNode().asString("API"), messageId); } // Very similar to the shell command of the same name. // Try to open the file. Fail if we don't have read and write access. // If the file doesn't exist, try to create it. Fail if the directories don't exist or we don't have permissions. // Return the empty string on success. // On failure return a helpful and user friendly string. static std::string touch(std::string const &name) { int handle = open(name.c_str(), O_RDWR | O_CREAT, PERMISSIONS); if (handle < 0){ *strerror = "System failure: unable to open file: " + name; std::cout<< *strerror < names; for (int i = 1; i <= DIVISOR; i++) { const std::string name = pathForFile(i); assert(!name.empty()); names[i] = name; } for (auto const &kvp : names) { const std::string errorMessage = touch(kvp.second); if (!errorMessage.empty()) { TclList msg; msg<(&toSave[0]), &size, reinterpret_cast(&layout[0]), layout.length(), Z_BEST_COMPRESSION); if (compressionResult == Z_OK) { assert(size <= toSave.length()); if (size > 655360) { // Too big. We have no direct way to report an error. But this will // have a reasonable result. When someone tries to read this, the // client will see the 0 length result and will display an error // message. // // Note: 65536 came from MySQL. The file system doesn't have that // limitation. I multiplied that by 10 somewhat arbitrarily. Things // can be bigger now. But we still have a limit. toSave.clear(); originalSize = 0; } else toSave.resize(size); } else { // Problem compressing it. Probably it was too small to begin with so // compression didn't (significantly) help. toSave = layout; originalSize = 0; } std::vector< std::string > sql; sql.push_back("INSERT INTO cloud_layout " "SET user_id=" + ntoa(userId) + ", " "share_code=MD5(CONCAT(" + ntoa(getRandom31()) + ",NOW()," + ntoa(userId) + ")), " "original_size=" + ntoa(originalSize) + ", " "icon='" + mysqlEscapeString(request.getProperty("icon")) + "', " "fill_screen='" + (request.getProperty("fill_screen", false)?'Y':'N') + "', " "clear_previous_layout='" + (request.getProperty("clear_previous_layout", false) ?'Y':'N') + "', " "short_description='" + mysqlEscapeString(request.getProperty("short_description")) + "', " "long_description='" + mysqlEscapeString(request.getProperty("long_description")) + "', " "window_count='" // This should be a number, but we don't trust it because it // came from the client, so put it in quotes and let MySQL // sanitize it. + mysqlEscapeString(request.getProperty("window_count")) + "', " "creation=NOW()"); sql.push_back("SELECT share_code, strategy_id " "FROM cloud_layout " "WHERE strategy_id=LAST_INSERT_ID()"); MysqlResultRef result = database().tryAllUntilSuccess(sql.begin(), sql.end(), s_mtSaveToCloud)[1]; const std::string saveError = saveToDisk(result->getIntegerField(1, 0), toSave); if (!saveError.empty()) LogFile::primary().sendString(saveError, request.getSocketInfo()); XmlNode response; response.properties["CODE"] = CLOUD_PREFIX + result->getStringField(0); addToOutputQueue(request.getSocketInfo(), response.asString("API"), request.getResponseMessageId()); } void cloudDelete(ExternalRequest &request, UserId userId) { ThreadMonitor::SetState tm(s_mtCloudDelete); tm.increment(s_mtCloudDelete); std::string strategyId = request.getProperty("id"); std::string fileName = pathForFile(strategyId); if (!fileName.empty()) { std::vector< std::string > sql; const std::string where = "WHERE strategy_id='" + mysqlEscapeString(strategyId) + "' AND user_id=" + ntoa(userId); sql.push_back("SELECT 1 FROM cloud_layout " + where); sql.push_back("DELETE FROM cloud_layout " + where); const auto results = database().tryAllUntilSuccess(sql.begin(), sql.end(), s_mtCloudDelete); if (results[0]->rowIsValid()) { // We deleted something. That means that the database was aware of // this layout and, more importantly, that the layout belongs to this // user. I.e. this user has permissions to delete this! The file // system doesn't know anything about users, so we have to check // the database before trying to delete something from the file // system. int unlinkError = unlink(fileName.c_str()); if (unlinkError) { TclList msg; msg<(*current); const UserInfoExport userInfo = userInfoGetInfo(request.getSocketInfo()); if (userInfo.status != sFull) // You must be logged in to save or change anything. sendNullResponse(request); else switch (current->callbackId) { case mtSaveToCloud: saveToCloud(request, userInfo.userId); break; case mtCloudDelete: cloudDelete(request, userInfo.userId); break; case mtCloudRevoke: cloudRevoke(request, userInfo.userId); break; default: abort(); } } NfsMasterDbThread() : ForeverThreadUser(IContainerThread::create("NfsMasterDbThread")) { CommandDispatcher *c = CommandDispatcher::getInstance(); c->listenForCommand("save_to_cloud", this, mtSaveToCloud); c->listenForCommand("cloud_delete", this, mtCloudDelete); c->listenForCommand("cloud_revoke", this, mtCloudRevoke); start(); } public: static NfsMasterDbThread *instance() { static NfsMasterDbThread *result = new NfsMasterDbThread(); return result; } }; class UserFiles { private: NewWorkerCluster &_workers; inline IContainerThread *getContainer() { return _workers.getContainer(); } std::string pathForUser(UserId userId) const { if (userId <= 0) return ""; const int DIVISOR = 5; return "user_data/" + ntoa(DIVISOR) + '/' + ntoa(userId % DIVISOR) + '/' + ntoa(userId); } std::string pathForUser(ExternalRequest const &m) const { SocketInfo *const socket = m.getSocketInfo(); const UserId userId = userInfoGetInfo(socket).userId; return pathForUser(userId); } static std::unordered_set< std::string > const &legalFileNames() { static std::unordered_set< std::string > legal = {"custom_channels.xml", "channel_image_0.png", "channel_image_1.png", "channel_image_2.png", "channel_image_3.png", "channel_image_4.png"}; return legal; } static bool isLegalFileName(std::string const &name) { return legalFileNames().count(name); } // Creates the path if it does not already exist. Might create several // directories. // Returns true if the path appears to be in place, // appropriately permissioned, etc. Otherwise returns false and writes // an error message to the log file. // // We are explicitly not going to sanitize the path. The user gets some // input, but the path is generated by server code and we assume it is // safe. static bool ensurePathExists(std::string const &path) { const std::vector< std::string > elements = explode("/", path); std::string pathSoFar; for (std::string const &element : elements) { if (!pathSoFar.empty()) pathSoFar += '/'; pathSoFar += element; struct stat fileStatus; int statError = stat(pathSoFar.c_str(), &fileStatus); // For simplicity assume no error means that the directory already // exists and we should look at the next one and assume that an error // means that the directory is missing and we should create it now. // Both of those could be wrong. But if we naïvely make these // assumptions and we're wrong, an explain...() call down the road will // explain the real problem. if (statError) { const mode_t MODE = 0755; // rwxr-xr-x int mkdirError = mkdir(pathSoFar.c_str(), MODE); if (mkdirError) { *strerror = "System failure: unknown directory: " + pathSoFar; std::cout<< *strerror <empty()) { // Delete this file int unlinkError = unlink(finalFileName.c_str()); if (!unlinkError) tm.increment("deleted!"); else if (errno == ENOENT) { // A component in pathname does not exist or is a dangling symbolic // link, or pathname is empty. // // Assume this means that the file we wanted to delete was already // deleted, or never existed. So that's not an error. Like asking // MySQL to delete something. tm.increment("delete not required"); unlinkError = false; } if (unlinkError) { *strerror = "System warning: unknown pathname: " + finalFileName; std::cout<< *strerror <c_str(), body->size()); if (written != body->size()) { *strerror = "System failure: unable to write data in file: " + tempFileName; std::cout<< *strerror <size()<size() < toReport; auto const &possible = legalFileNames(); toReport.reserve(possible.size() + 1); struct stat fileStatus; int statCount = 1; int statError = stat(path.c_str(), &fileStatus); if (statError) { // Can't find root directory for user. if (errno == ENOENT) // Not found. We've never created a directory for this user. Not // an error in our system. tm.increment("userFileList . ENOENT"); else { *strerror = "System failure: unable to create directory: " + path; std::cout<< *strerror <const & callback) { CommandDispatcher::getInstance() ->listenForCommand(name, new ForeverThreadUser::Listener< ExternalRequest > (getContainer(), callback), 0); } // This is an attempt to make listenForCommand() even easier to use. // Now you don't have to use the lambda syntax (which still seems crazy to // me) each time you call listenForCommand(). This one method takes care // of all of the lambdas. // // This still seems lacking, somehow. :( Maybe I could move this from // a method to a global template that could work on any class. Then I'd // have to explicitly include the this pointer. Maybe that's not terrible. void listenForCommand(std::string const &name, void (UserFiles::*method) (ExternalRequest const &) const) { listenForCommand(name, [=](ExternalRequest const &m) { (this->*method)(m); }); } public: UserFiles(NewWorkerCluster &workers) : _workers(workers) { listenForCommand("user_file_save", &UserFiles::userFileSave); listenForCommand("user_file_load", &UserFiles::userFileLoad); listenForCommand("user_file_list", &UserFiles::userFileList); } }; static const std::string s_mtCloudList = "mtCloudList"; static const std::string s_mtCloudList_null = "mtCloudList null"; static const std::string s_mtCloudLoad = "mtCloudLoad"; static const std::string s_mtCloudLoad_null = "mtCloudLoad null"; static const std::string s_mtCloudImport = "mtCloudImport"; static const std::string s_getLayoutBody = "getLayoutBody"; static const std::string s_getLayoutBody_reading = "getLayoutBody reading"; static const std::string s_getLayoutBody_decompressing = "getLayoutBody decompressing"; static const std::string s_getLayoutBody_notCompressed = "getLayoutBody not compressed"; static const std::string s_getLayoutBody_normal = "getLayoutBody normal"; static const std::string s_getLayoutBody_entryNotFound = "getLayoutBody not found"; static const std::string s_getLayoutBody_fileError = "getLayoutBody file ERROR"; static const std::string s_getLayoutBody_error = "getLayoutBody ERROR"; class NfsSlaveDbThread : public ForeverThreadUser { private: enum { mtCloudList, mtCloudLoad, mtCloudImport }; typedef TSRefCount< ExternalRequest > ExternalRequestRef; NewWorkerCluster _workers; UserFiles _userFiles; static DatabaseWithRetry &database() { return *DatabaseForThread(DatabaseWithRetry::SLAVE); } static void doCloudList(ExternalRequestRef const &original) { ThreadMonitor::SetState tm(s_mtCloudList); tm.increment(s_mtCloudList); SocketInfo *const socket = original->getSocketInfo(); const bool easternTime = userInfoUseEasternTime(socket); const UserId userId = userInfoGetInfo(socket).userId; std::string sql = "SELECT strategy_id, share_code, icon, clear_previous_layout, " "fill_screen, short_description, long_description, window_count, " "UNIX_TIMESTAMP(creation) as creation " "FROM cloud_layout " "WHERE user_id=" + ntoa(userId); XmlNode response; XmlNode &rows = response["ROWS"]; for (MysqlResultRef result = database().tryQueryUntilSuccess(sql); result->rowIsValid(); result->nextRow()) { XmlNode &row = rows[-1]; row.properties["ID"] = result->getStringField("strategy_id"); row.properties["CODE"] = CLOUD_PREFIX + result->getStringField("share_code"); row.properties["ICON"] = result->getStringField("icon"); row.properties["CLEAR_PREVIOUS_LAYOUT"] = (result->getStringField("clear_previous_layout") == "Y")?"1":"0"; row.properties["FILL_SCREEN"] = (result->getStringField("fill_screen") == "Y")?"1":"0"; row.properties["SHORT_DESCRIPTION"] = result->getStringField("short_description"); row.properties["LONG_DESCRIPTION"] = result->getStringField("long_description"); row.properties["WINDOW_COUNT"] = result->getStringField("window_count"); time_t creation = result->getIntegerField("creation", 0); if (creation) row.properties["CREATION"] = exportTime(creation, easternTime); } addToOutputQueue(socket, response.asString("API"), original->getResponseMessageId()); } static std::string getLayoutBody(int64_t strategyId, int64_t originalSize) { ThreadMonitor::SetState tm(s_getLayoutBody_reading); const std::string fileName = pathForFile(strategyId); if (fileName.empty()) { // Not found. This seems unavoidable. This is different from the data // errors below. Those suggest something more troubling, possibly a // programming error. tm.increment(s_getLayoutBody_entryNotFound); return ""; } std::string body; if (!loadFromFile(fileName, body)) { // Some type of internal error. The database says it should be there // but we can't find it in the file system. I suppose it could happen // because of a race condition, where another thread or server deleted // the item between when we went to the database and the file system, // but that should be rare. If we see this a lot, that suggests a // problem. TclList msg; msg<(&result[0]), &size, reinterpret_cast(&body[0]), body.length()); if ((zlibResult != Z_OK) || (size != (uint64_t)originalSize)) { // Some sort of error. An inconsistency between the database and the // file system. Return our standard "" as an indication of an error. tm.increment(s_getLayoutBody_error); TclList msg; msg<getStringField("clear_previous_layout") == "Y")?"1":"0"; layout.properties["FILL_SCREEN"] = (result->getStringField("fill_screen") == "Y")?"1":"0"; layout.properties["NAME"] = result->getStringField("short_description"); layout.text = getLayoutBody(result->getIntegerField("strategy_id", 0), result->getIntegerField("original_size", 0)); addToOutputQueue(original->getSocketInfo(), response.asString("API"), original->getResponseMessageId()); } static void doCloudLoad(ExternalRequestRef const &original) { ThreadMonitor::SetState tm(s_mtCloudLoad); tm.increment(s_mtCloudLoad); SocketInfo *const socket = original->getSocketInfo(); const UserId userId = userInfoGetInfo(socket).userId; const int id = strtolDefault(original->getProperty("id"), 0); getFromCloud(original, "user_id=" + ntoa(userId) + " AND strategy_id=" + ntoa(id)); } static void doCloudImport(ExternalRequestRef const &original) { ThreadMonitor::SetState tm(s_mtCloudImport); tm.increment(s_mtCloudImport); std::string shortCode = original->getProperty("code"); if (shortCode.length() > 32) shortCode.erase(0, shortCode.length() - 32); getFromCloud(original, "share_code='" + mysqlEscapeString(shortCode) + "'"); } virtual void initializeInThread() { // This is the dispatcher thread. If you need a database, use a worker // thread. assertNoDatabaseThisThread(); const int count = strtolDefault(getConfigItem("misc_ro_nfs_threads"), 4); assert(count > 0); _workers.createWorkersLambda([]() { // These tasks are explicitly aimed at a read only database. // Related tasks for the master database are handled elsewhere. disableDatabaseThisThread(DatabaseWithRetry::MASTER); }, count); } virtual void handleRequestInThread(Request *current) { ThreadMonitor &tm = ThreadMonitor::find(); // Normally the container thread would delete the request. Instead we're // using a reference counted pointer to handle the deletion. ExternalRequestRef const request = dynamic_cast< ExternalRequest * >(current); keepOriginal(); // Notice the implied assertion. If current is not of type // ExternalRequest we'd catch it here. We'd have to change our memory // management if that was not true. SocketInfo *const socket = request->getSocketInfo(); switch(current->callbackId) { case mtCloudList: { if (userInfoGetInfo(socket).userId) { tm.increment(s_mtCloudList); _workers.addJob1([request]() { doCloudList(request); }, socket); } else { tm.increment(s_mtCloudList_null); sendNullResponse(*request); } break; } case mtCloudLoad: { if (userInfoGetInfo(socket).userId) { tm.increment(s_mtCloudLoad); _workers.addJob1([request]() { doCloudLoad(request); }, socket); } else { tm.increment(s_mtCloudLoad_null); sendNullResponse(*request); } break; } case mtCloudImport: { tm.increment(s_mtCloudImport); _workers.addJob1([request]() { doCloudImport(request); }, socket); break; } default: abort(); } } NfsSlaveDbThread() : ForeverThreadUser(IContainerThread::create("NfsSlaveDbThread")), _workers(getContainer(), "NfsSlaveDbThread"), _userFiles(_workers) { CommandDispatcher *c = CommandDispatcher::getInstance(); c->listenForCommand("cloud_list", this, mtCloudList); c->listenForCommand("cloud_load", this, mtCloudLoad); c->listenForCommand("cloud_import", this, mtCloudImport); start(); } public: static NfsSlaveDbThread *instance() { static NfsSlaveDbThread *result = new NfsSlaveDbThread(); return result; } }; } void initMiscNFS() { if (getConfigItem("skip_nfs") == "1") // This seems essential. The NFS code has some very specific requirements. // If there's a problem, maybe a test setup, maybe the NFS server is down, // we might want a way to skip this stuff and let the rest of the program // run. That is NOT the default. It's hard to report a half working // server. By killing the program in case of trouble, and by testing as // soon as we start, we have the best chance of finding problems ASAP. return; if (!MiscNFS::testPaths()) LogFile::primary().scheduleShutdown(); MiscNFS::NfsMasterDbThread::instance(); MiscNFS::NfsSlaveDbThread::instance(); }