#include "../shared/CommandDispatcher.h" #include "../shared/ReplyToClient.h" #include "../shared/SimpleLogFile.h" #include "../shared/GlobalConfigFile.h" #include "../shared/ContainerThread.h" #include "RecordDispatcher.h" #include "Strategy.h" #include "AlertsDailyDatabase.h" #include "EmailSms.h" #include "SendEmailThread.h" #include #include #include #include // info describing sms/email strategy struct EmailSmsStrategy { UserId userId; int strategyId; std::string destination; std::string windowName; Parse::AlertStrategy strategy; std::string title; std::string body; time_t last; }; // this thread is responsible for listening to alerts // and sending them to the email thread. this started // as a clone of SimpleAlerts test class EmailSms : public ForeverThreadUser { private: std::list< EmailSmsStrategy> _strategies; time_t _nextRefreshTime; enum { mtAlert }; DatabaseWithRetry _readOnlyDatabase; int _skipCount; bool _showEntireRecord; bool _alertsDailyInitialized; SendEmailThread emailThread; void tildeReplace(std::string &str, const EmailSmsStrategy &strategy, const std::string &symbol, const double &price, const int &volume) { size_t pos = 0; while ((pos = str.find('~', pos)) != std::string::npos) { if (str.size() > pos + 1) { switch (str[pos + 1]) { case 'w': { str.replace(pos, 2, strategy.windowName); pos += strategy.windowName.size(); break; } case 's': { str.replace(pos, 2, symbol); pos += symbol.size(); break; } case 'c': { std::string replacement = "http://www.trade-ideas.com/EmailSmsConfig.html?command=show&strategy_id=" + ntoa(strategy.strategyId); str.replace(pos, 2, replacement); pos += replacement.size(); break; } case 'm': { std::string replacement = "http://mobile.trade-ideas.com/EmailSmsConfig.html?command=show&strategy_id=" + ntoa(strategy.strategyId); str.replace(pos, 2, replacement); pos += replacement.size(); break; } case 'p': { std::string replacement = priceToString(price); str.replace(pos, 2, replacement); pos += replacement.size(); break; } case 'v': { std::string replacement = ntoa(volume); str.replace(pos, 2, replacement); pos += replacement.size(); break; } case '~': { str.replace(pos, 2, "~"); pos += 1; break; } default: { pos += 1; } } } else { return; } } } void checkTimer() { const time_t now = time(NULL); if (_nextRefreshTime > now) // Not ready yet. return; //_strategies.clear(); auto stratIt = _strategies.begin(); MysqlResultRef result = _readOnlyDatabase.tryQueryUntilSuccess( "SELECT user_id, strategy_id, sms_email, settings, title, body, unix_timestamp(last) as last " "FROM email_sms_strategies, users WHERE id=user_id AND authorization_expires > NOW()" "AND enabled = 'Y' ORDER BY user_id ASC, strategy_id ASC"); //for aggregating strategies by user UserId lastUser = 0; std::vector< Parse::AlertStrategy * > currentStrategies; for (; result->rowIsValid(); result->nextRow()) { UserId userId = strtolDefault(result->getStringField("user_id"), 0); if (userId == 0) continue; if (lastUser != userId) { if (!currentStrategies.empty()) { Parse::AlertStrategy::compile(currentStrategies); currentStrategies.clear(); } } lastUser = userId; int strategyId = strtolDefault(result->getStringField("strategy_id"), -1); while (stratIt->userId < userId && stratIt != _strategies.end()) stratIt = _strategies.erase(stratIt); while (stratIt->userId == userId && stratIt->strategyId < strategyId && stratIt != _strategies.end()) stratIt = _strategies.erase(stratIt); const std::string collaborate = result->getStringField("settings"); // found an old strategy, just update it if (stratIt->userId == userId && stratIt->strategyId == strategyId) { if (collaborate.empty()) { _strategies.erase(stratIt); } else { PropertyList properties; parseUrlRequest(properties, collaborate); if (properties.find("WN") != properties.end()) stratIt->windowName = properties["WN"]; else stratIt->windowName = ""; stratIt->strategy.load(collaborate, userId, _readOnlyDatabase, false); stratIt->destination = result->getStringField("sms_email"); stratIt->title = result->getStringField("title"); stratIt->body = result->getStringField("body"); currentStrategies.push_back(&stratIt->strategy); time_t lastFromDb = strtolDefault(result->getStringField("last"), 0); if (lastFromDb > stratIt->last || lastFromDb == 0) stratIt->last = lastFromDb; } } else // insert the new strategy { if (!collaborate.empty()) { EmailSmsStrategy emailSmsStrategy; PropertyList properties; parseUrlRequest(properties, collaborate); if (properties.find("WN") != properties.end()) { emailSmsStrategy.windowName = properties["WN"]; } emailSmsStrategy.strategyId = strategyId; emailSmsStrategy.strategy.load(collaborate, userId, _readOnlyDatabase, false); emailSmsStrategy.userId = userId; emailSmsStrategy.destination = result->getStringField("sms_email"); emailSmsStrategy.title = result->getStringField("title"); emailSmsStrategy.body = result->getStringField("body"); emailSmsStrategy.last = strtolDefault(result->getStringField("last"), 0); stratIt = _strategies.insert(stratIt, emailSmsStrategy); currentStrategies.push_back(&stratIt->strategy); } } if (stratIt != _strategies.end()) { stratIt++; } } while (stratIt != _strategies.end()) stratIt = _strategies.erase(stratIt); if (!currentStrategies.empty()) Parse::AlertStrategy::compile(currentStrategies); _nextRefreshTime = time(NULL) + 30; SendEmailThread::SendAlertRequest * monitorAliveRequest = new SendEmailThread::SendAlertRequest(SendEmailThread::mtMonitorAlive); emailThread.newRequest(monitorAliveRequest); } public: virtual void beforeSleep(IBeforeSleepCallbacks &callbacks) { checkTimer(); callbacks.wakeAfterMs(1000 * (_nextRefreshTime - time(NULL))); } virtual void initializeInThread() { // Load all the strategies ASAP. In particular, load it before the // first call to notifyWhenReady(). checkTimer(); } virtual void handleRequestInThread(Request *original) { switch (original->callbackId) { case mtAlert: { const static long TWENTY_MINUTES = 60*20; NewRecord *request = dynamic_cast (original); UserId lastUser = 0; Execution::RecordInfo recordInfo; recordInfo.setRecord(request->record); for (auto it = _strategies.begin(); it != _strategies.end(); it++) { time_t currentTime = time(NULL); if (it->userId != lastUser) it->strategy.init(recordInfo); lastUser = it->userId; if (((currentTime - TWENTY_MINUTES) > it->last) && (it->strategy.evaluateWhere(recordInfo))) { //static std::regex emailRegex(R"(^[A-Za-z0-9_.\\-\+]+@([A-Za-z0-9_\\-]+\\.)+[A-Za-z0-9_\\-]+$)"); TclList msg; msg << FLF << "send_email"; msg << it->destination; bool valid; std::string symbol; request->record->lookUpValue(MainFields::symbol).getString(valid, symbol); std::string desc; request->record->lookUpValue(MainFields::description).getString(valid, desc); if (valid) { msg << symbol; } double price = 0.0; request->record->lookUpValue(MainFields::price).getDouble(valid, price); if (valid) { msg << price; } int64_t volume = 0; request->record->lookUpValue(MainFields::tvol).getInt(valid, volume); if (valid) { msg << volume; } sendToLogFile(msg); std::string title = it->title; std::string body = it->body; tildeReplace(title, *it, symbol, price, volume); tildeReplace(body, *it, symbol, price, volume); bool isValidEmail = true; int lastDotLocation = -1; int atLocation = -1; for (unsigned int i = 0; i < it->destination.length(); i++) { char c = it->destination[i]; if (c == '@') { if (atLocation != -1 || i < 1) { isValidEmail = false; break; } else atLocation = i; } else if (c == '.') { if ((lastDotLocation > -1) && (lastDotLocation == int(i) - 1)) { isValidEmail = false; break; } lastDotLocation = i; } else if ((i == it->destination.length() - 1) && (c == '-')) { isValidEmail = false; break; } else if (atLocation > 0) { if (!((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || (c == '-'))) { isValidEmail = false; break; } } else { if (!((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || (c == '-') || (c == '#') || (c == '$') || (c == '%') || (c == '&') || (c == '\'') || (c == '*') || (c == '+') || (c == '/') || (c == '=') || (c == '?') || (c == '^') || (c == '_') || (c == '`') || (c == '{') || (c == '|') || (c == '}') || (c == '~'))) { isValidEmail = false; break; } } } if ((atLocation < 1) || (lastDotLocation < (atLocation + 2)) || (lastDotLocation >= (int)(it->destination.length() - 1))) isValidEmail = false; if (it->destination.substr(0, 7) == "iPhone:") { auto pieces = explode(":", it->destination); if (pieces.size() < 2) continue; if (pieces[1].size() == 64) { SendEmailThread::SendAlertRequest * phoneRequest = new SendEmailThread::SendAlertRequest(SendEmailThread::mtSendPhone); phoneRequest->userId = it->userId; phoneRequest->strategyId = it->strategyId; phoneRequest->email = it->destination; phoneRequest->message = "REPLACE INTO iphone_notify_queue" "(user_id, device_token, alert_body, timestamp, strategy_id)" "VALUES(" + ntoa(it->userId) + ", '" + mysqlEscapeString(pieces[1]) + "', '" + mysqlEscapeString(body) + "', NOW(), " + ntoa(it->strategyId) + ")"; emailThread.newRequest(phoneRequest); it->last = time(NULL); } } //else if (std::regex_match(it->destination, emailRegex)) else if (isValidEmail) { SendEmailThread::SendAlertRequest * emailRequest = new SendEmailThread::SendAlertRequest(SendEmailThread::mtSendEmail); emailRequest->subject = title; emailRequest->message = body; emailRequest->email = it->destination; emailRequest->userId = it->userId; emailRequest->strategyId = it->strategyId; emailThread.newRequest(emailRequest); it->last = time(NULL); } } else { _skipCount++; if (_skipCount >= 10000) { TclList msg; msg << FLF << "skipped alerts that didn't match" << _skipCount; ValueBox field = request->record->lookUpValue(MainFields::id); bool valid; int64_t value; field.getInt(valid, value); if (valid) msg << "id" << value; field = request->record->lookUpValue(MainFields::timestamp); field.getInt(valid, value); if (valid) msg << "timestamp" << ctimeString(value); sendToLogFile(msg); _skipCount = 0; } } } break; } } } EmailSms() : ForeverThreadUser(IContainerThread::create("EmailSms")), _nextRefreshTime(0), _readOnlyDatabase(true, "EmailSmsRO"), //_writeDatabase(false, "EmailSmsRW"), _skipCount(0), _alertsDailyInitialized(false) { IRecordDispatcher::getAlerts()->listenForRecords(this, mtAlert); start(); } }; void initEmailSms() { // This thread pushes its output directly to the end user. Exactly one // machine should be configured this way. if (getConfigItem("email_sms") == "1") { setEmailThreadSMTPHost(getConfigItem("smtp_host", "localhost")); new EmailSms(); } }