#include #include "../shared/GlobalConfigFile.h" #include "../shared/SimpleLogFile.h" #include "../shared/ThreadMonitor.h" #include "../shared/ThreadClass.h" #include "xs-include/XS.h" #include "Subscriptions.h" #include "QuickDeadmanTimer.h" #include "GetData.h" class GetData : public ThreadClass { private: ThreadMonitor *_threadMonitor; QuickDeadmanTimer quickDeadmanTimer; static const int VF_SRC_ID = 555; XS::Token *queryCommand; XS::Token *ctfTokenNum; XS::Token *queryTag; XS::Token *actualBuyForcesVolume; XS::Token *actualSellForcesVolume; XS::Token *expectedVelocity; XS::Token *cspOutputTokens; XS::Token *enumSrcId; XS::Token *symbolTicker; static std::string getStringField(XS::Field const *field) { return std::string(field->getInternalString(), field->getStringLen()); } void processMessage(XS::Msg *message, XS::Connection *conn) { const XS::Field * srcIdField = message->findByTok(enumSrcId); int64_t srcId = 0; if (srcIdField) srcId = srcIdField->getInt(); // record heartbeats if (srcId==XS::TIME_ENUM_SRC_ID) { static const std::string S_HB = "HB"; _threadMonitor->increment(S_HB); quickDeadmanTimer.signal(); return; } // And other types of messages. if (srcId != VF_SRC_ID) { static const std::string S_OTHER = "OTHER"; _threadMonitor->increment(S_OTHER); return; } //message->print(); const XS::Field *field = message->findByTok(symbolTicker); if (!field) { static const std::string S_NO_SYMBOL = "NO SYMBOL"; _threadMonitor->increment(S_NO_SYMBOL); return; } VFMessage *outgoing = new VFMessage(); outgoing->symbol = getStringField(field); field = message->findByTok(actualBuyForcesVolume); if (!field) outgoing->buyVolume = 0; else outgoing->buyVolume = field->getInt(); field = message->findByTok(actualSellForcesVolume); if (!field) outgoing->sellVolume = 0; else outgoing->sellVolume = field->getInt(); field = message->findByTok(expectedVelocity); if (!field) outgoing->expectedVelocity = 0; else outgoing->expectedVelocity = field->getInt(); outgoing->send(); static const std::string S_NORMAL = "NORMAL"; _threadMonitor->increment(S_NORMAL); } void initToken(XS::Connection *connection, XS::Token *&token, std::string name) { token = connection->getTokenByName(name.c_str()); if (!token) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"Could not find token" <setState(S_INIT); // Initialize API XS::TStatus status; if ((status = XS::initialize("vf_proxy", "/tmp/")) != XS::tstat_OK) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"XS::initialize() failed"< servers = explode(",", getConfigItem("XS_host", "192.168.30.163:7022,192.168.30.164:7022")); std::string server = servers[TimeVal(true).tv_usec % servers.size()]; std::string username = getConfigItem("XS_username", "tradeideas1"); std::string password = getConfigItem("XS_password", "tradeideas1"); if ((status = connection->open(server.c_str(), username.c_str(), password.c_str())) != XS::tstat_OK) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"connection::open() failed"<addFieldString(queryCommand, "SelectUserTokens"); reqFields->addFieldInt(ctfTokenNum, actualBuyForcesVolume->getCtfId()); reqFields->addFieldInt(ctfTokenNum, actualSellForcesVolume->getCtfId()); reqFields->addFieldInt(ctfTokenNum, expectedVelocity->getCtfId()); reqFields->addFieldInt(queryTag, 55); //reqFields->print(); // Send request if (connection->send(reqFields) != XS::tstat_OK) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"connection::send() failed"<erase(); reqFields->addFieldString(queryCommand, "SelectOutput"); reqFields->addFieldString(cspOutputTokens, "User"); reqFields->addFieldInt(queryTag, 56); // Print request message before sending (optional, for debugging) //reqFields->print(); // Send request if (connection->send(reqFields) != XS::tstat_OK) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"connection::send() failed"<addFieldString(queryCommand, "QuerySnapAndSubscribe"); //req->addFieldString(symbolTicker, "DELL"); req->addFieldInt(enumSrcId, VF_SRC_ID); req->addFieldInt(queryTag, 3); // Print request message before sending (optional, for debugging) //req->print(); // Send request if (connection->send(req) != XS::tstat_OK) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"connection::send() failed"<setState(S_RECEIVE); status = connection->receive(message); static const std::string S_PROCESS = "PROCESS"; _threadMonitor->setState(S_PROCESS); if (status < 0) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"connection::receive() failed" <close(); delete connection; } protected: virtual void threadFunction() { try { main(); } catch (std::exception &ex) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"Exception" <