#include #include "../../shared/GlobalConfigFile.h" #include "../misc_framework/AccumulateInsert.h" #include "../../shared/CommandDispatcher.h" #include "../../shared/ReplyToClient.h" #include "../../fast_alert_search/FieldLists.h" #include "../../fast_alert_search/FeedMaster.h" #include "PushAlerts.h" ///////////////////////////////////////////////////////////////////// // PushAlertsThread ///////////////////////////////////////////////////////////////////// static std::string getExternalName() { // Please see MiscSupport.h. geHostName() and getShortHostName() are now // available as standard library routines. Don't copy the code below. std::string result; char hostName[100]; const bool error = gethostname(hostName, 100); if (error) result = "unknown"; else { // If the host name is too long, according to the man page, it will be // truncated. Whether or not the truncated name is NULL terminated is // explicitly not defined. hostName[99] = 0; // Say "fogell" not "fogell.trade-ideas.com" result = explode(".", hostName)[0]; } const std::string port = getConfigItem("listen_port"); if (!port.empty()) result += ':' + port; return result; } PushAlertsThread::PushAlertsThread(std::string name, std::string debugPrefix) : ThreadClass(name), _incoming(getName()), _fake(getConfigItem("fake_alerts") == "1") { CommandDispatcher::getInstance()-> listenForCommand(debugPrefix + "push_alerts_test", &_incoming, mtTest); startThread(); }; PushAlertsThread::~PushAlertsThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); }; void PushAlertsThread::threadFunction() { ThreadMonitor &tm = ThreadMonitor::find(); static const std::string s_ADD_ALERT = "ADD_ALERT"; static const std::string s_ADD_TOP_LIST = "ADD_TOP_LIST"; static const std::string s_SERIALIZE = "serialize"; static const std::string s_ADD_DATA = "add_data"; const std::string sWorking = "working"; tm.setState(sWorking); FeedMaster feedMasterAlerts(true); FeedMaster feedMasterTopList(false); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtAlert: { tm.setState(s_ADD_DATA); AlertEvent *request = dynamic_cast< AlertEvent * >(current); if (request->isAlert && _fake) request->recordBuilder.append(MainFields::fake, true); request->recordBuilder.reserveInt(MainFields::id); tm.setState(s_SERIALIZE); const std::string serialized = request->recordBuilder.exportAsString(); tm.setState(s_ADD_DATA); if (request->isAlert) { tm.increment(s_ADD_ALERT); feedMasterAlerts.sendRecord(serialized); } else { tm.increment(s_ADD_TOP_LIST); feedMasterTopList.sendRecord(serialized); } break; } case mtDeadManTimer: { static const std::string externalName = getExternalName(); feedMasterAlerts.updateDeadManTimer(externalName); feedMasterTopList.updateDeadManTimer(externalName); break; } case mtTest: { // Send a fake alert and top list entry. ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); const std::string symbol = request->getProperty("symbol", "TEST"); RecordBuilder recordBuilder; recordBuilder.reserveInt(MainFields::id); recordBuilder.append(MainFields::fake, true); recordBuilder.append(MainFields::symbol, symbol); recordBuilder.append(MainFields::timestamp, time(NULL)); recordBuilder.append(MainFields::rsi_1, 1.08); recordBuilder.append(MainFields::rsi_2, 2.08); recordBuilder.append(MainFields::rsi_5, 5.08); recordBuilder.append(MainFields::rsi_15, 15.08); recordBuilder.append(MainFields::rsi_60, 60.08); recordBuilder.append(MainFields::price, 55.00); recordBuilder.append(MainFields::social_rv, 9.09); recordBuilder.append(MainFields::up_10, 910); recordBuilder.append(MainFields::up_15, 915); recordBuilder.append(MainFields::relvol, 609); recordBuilder.append(MainFields::pm_volume, 98765); feedMasterTopList.sendRecord(recordBuilder.exportAsString()); recordBuilder.append(MainFields::alert_type, "DEMO"); feedMasterAlerts.sendRecord(recordBuilder.exportAsString()); // so the user knows he didn't have a typo addToOutputQueue(request->getSocketInfo(), symbol + "!", request->getResponseMessageId()); break; } case mtQuit: { delete current; return; } } tm.setState(sWorking); delete current; } _incoming.waitForRequest(); } } void PushAlertsThread::onAlert(AlertEvent *event) { event->callbackId = mtAlert; _incoming.newRequest(event); } void PushAlertsThread::updateDeadManTimer() { Request *r = new Request(NULL); r->callbackId = mtDeadManTimer; _incoming.newRequest(r); }