#include "NxCoreKafkaConsumer.h" #include #include #include #include #include #include "NxCoreHelper.h" #include "../modern-cpp-kafka/include/kafka/KafkaConsumer.h" #include "../shared/SimpleLogFile.h" #include "../shared/ThreadClass.h" #include "../shared/ThreadSafeRefCount.h" #include "../shared/CommandDispatcher.h" #include "../shared/Messages.h" #include "../shared/GlobalConfigFile.h" #include "data_framework/SynchronizedTimers.h" #include "data_framework/GenericTosData.h" #include "data_framework/GenericL1Data.h" class NxCoreKafkaTosDataNode : public GenericTosDataNode { private: static const int bmData = bmAvailable; static const int bmInvalid = bmData + 1; enum class VerboseDebug { No, Yes, Unknown }; static VerboseDebug _verboseDebug; static std::string getDataChannel(std::string const &symbol) { return "NxCoreKafkaTosDataNode.D." + symbol; } static std::string getInvalidChannel(std::string const &symbol) { return "NxCoreKafkaTosDataNode.I." + symbol; } class Data : public BroadcastMessage { private: time_t _currentTime; TosData _data; public: time_t getCurrentTime() const { return _currentTime; } TosData const &getData() const { return _data; } Data(std::string const &symbol, TosData const &kafkaData); }; // False is the traditional mode, send one request to the market data proxy // for each symbol we care about. True is the new mode, send one request // and the market data proxy will send TOS for all symbols. bool requestAllTos(); NxCoreKafkaTosDataNode(DataNodeArgument const &args); friend class DataNode; protected: void onBroadcast(BroadcastMessage &message, int msgId); public: static DataNodeLink *find(DataNodeListener *listener, int msgId, GenericTosDataNode *&node, std::string const &symbol); static void sendData(DataNodeManager *manager, std::string const &symbol, TosData const &tosData); static void sendInvalid(DataNodeManager *manager, std::string const &symbol); }; ///////////////////////////////////////////////////////////////////// // NxCoreKafkaL1DataNode ///////////////////////////////////////////////////////////////////// class NxCoreKafkaL1DataNode : public GenericL1DataNode { private: static const int bmData = bmAvailable; static const int bmInvalid = bmData + 1; static std::string getDataChannel(std::string const &symbol) { return "NxCoreKafkaL1DataNode.D." + symbol; } static std::string getInvalidChannel(std::string const &symbol) { return "NxCoreKafkaL1DataNode.I." + symbol; } class Data : public BroadcastMessage { private: time_t _currentTime; L1Data _data; public: time_t getCurrentTime() const { return _currentTime; } L1Data const &getData() const { return _data; } Data(std::string const &symbol, time_t const &time, L1Data const &l1Data); }; NxCoreKafkaL1DataNode(DataNodeArgument const &args); friend class DataNode; protected: void onBroadcast(BroadcastMessage &message, int msgId); public: static DataNodeLink *find(DataNodeListener *listener, int msgId, GenericL1DataNode *&node, std::string const &symbol); static void sendData(DataNodeManager *manager, std::string const &symbol, time_t const &time, L1Data const &l1Data); static void sendInvalid(DataNodeManager *manager, std::string const &symbol); }; NxCoreKafkaL1DataNode::Data::Data(std::string const &symbol, time_t const &time, L1Data const &l1Data) : BroadcastMessage(getDataChannel(symbol), 0) { _currentTime = time; _data = l1Data; } void NxCoreKafkaL1DataNode::sendData(DataNodeManager *manager, std::string const &symbol, time_t const &time, L1Data const &l1Data) { Data *message = new Data(symbol, time, l1Data); // TclList msg; // msg<<__FILE__<<__LINE__<<__FUNCTION__ // <<"NxCoreKafkaL1DataNode"<getData().dump(); // sendToLogFile(msg); message->send(manager); } void NxCoreKafkaL1DataNode::sendInvalid(DataNodeManager *manager, std::string const &symbol) { BroadcastMessage *message = new BroadcastMessage(getInvalidChannel(symbol)); message->send(manager); } DataNodeLink *NxCoreKafkaL1DataNode::find(DataNodeListener *listener, int msgId, GenericL1DataNode *&node, std::string const &symbol) { NxCoreKafkaL1DataNode *tempNode; DataNodeLink *result = findHelper(listener, msgId, tempNode, symbol); node = tempNode; return result; } NxCoreKafkaL1DataNode::NxCoreKafkaL1DataNode(DataNodeArgument const &args) : GenericL1DataNode(args) { registerForBroadcast(getDataChannel(symbol()), bmData); registerForBroadcast(getInvalidChannel(symbol()), bmInvalid); TclList msg; msg << __FILE__ << __LINE__ << __FUNCTION__ << "Listening for" << symbol(); sendToLogFile(msg); } void NxCoreKafkaL1DataNode::onBroadcast(BroadcastMessage &message, int msgId) { switch (msgId) { case bmData: { Data const &data = dynamic_cast(message); _valid = true; _current = data.getData(); if (data.getCurrentTime()) SynchronizedTimer::safeSetTime(data.getCurrentTime()); notifyListeners(); break; } case bmInvalid: if (_valid) { _valid = false; notifyListeners(); } break; default: GenericL1DataNode::onBroadcast(message, msgId); } } class KafkaConsumerThread : private ThreadClass { private: DataNodeManager *_manager; void threadFunction() override; protected: public: KafkaConsumerThread(DataNodeManager *manager) : ThreadClass("KafkaConsumerThread"), _manager(manager) { ThreadClass::startThread(); } ~KafkaConsumerThread() {} }; void KafkaConsumerThread::threadFunction() { // are topics static or should they be configurable? std::string topic = getConfigItem("kafka_topic", "md-equities-trades-all-v2"); std::string bootstrap = getConfigItem("kafka_boostrap_servers"); // Create configuration object kafka::Properties props({{"bootstrap.servers", bootstrap}, {"enable.auto.commit", "true"}}); // Create a consumer instance kafka::clients::KafkaConsumer consumer(props); // Subscribe to topics consumer.subscribe({topic}); // Read messages from the topic std::cout << "% Reading messages from topic: " << topic << std::endl; //std::time_t lastPrinted = 0; while (true) { auto records = consumer.poll(std::chrono::milliseconds(5)); for (const auto &record : records) { // Ignore empty message if (record.value().size() == 0) continue; if (!record.error()) // && record.key().toString() == "AAPL") { // std::cout << "% Got a new message..." << std::endl; // std::cout << " Topic : " << record.topic() << std::endl; // std::cout << " Partition: " << record.partition() << std::endl; // std::cout << " Offset : " << record.offset() << std::endl; // std::cout << " Timestamp: " << record.timestamp().toString() << std::endl; // std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl; // std::cout << " Key [" << record.key().toString() << "]" << std::endl; // std::cout << " Value [" << record.value().toString() << "]" << std::endl; KafkaDataV2 kafkaData; memcpy(&kafkaData, record.value().data(), sizeof(KafkaDataV2)); byte bytes[sizeof(KafkaDataV2)]; memcpy(&bytes, record.value().data(), sizeof(KafkaDataV2)); TosData tosData; L1Data l1Data; std::string symbol = kafkaData.symbol; tosData.price = nxCorePriceToDouble(kafkaData.price, kafkaData.tradePriceType); tosData.time = fromDateTimeTicks(kafkaData.time); tosData.size = kafkaData.size; tosData.exchange = kafkaData.exchange; tosData.formT = (kafkaData.tradeCondition == 1); tosData.updatesLast = ((kafkaData.conditionFlags & NxTCF_NOLAST) != NxTCF_NOLAST); tosData.newPrint = true; tosData.todaysClose = nxCorePriceToDouble(kafkaData.last, kafkaData.tradePriceType); tosData.volume = kafkaData.totalVolume; tosData.open = nxCorePriceToDouble(kafkaData.open, kafkaData.tradePriceType); tosData.high = nxCorePriceToDouble(kafkaData.high, kafkaData.tradePriceType); tosData.low = nxCorePriceToDouble(kafkaData.low, kafkaData.tradePriceType); l1Data.askExchange = regionalExgFromNxCore((NxEXCH) kafkaData.askExchange); l1Data.askPrice = nxCorePriceToDouble(kafkaData.ask, kafkaData.quotePriceType); l1Data.askSize = kafkaData.askSize; l1Data.bidExchange = regionalExgFromNxCore((NxEXCH) kafkaData.bidExchange);; l1Data.bidPrice = nxCorePriceToDouble(kafkaData.bid, kafkaData.quotePriceType); l1Data.bidSize = kafkaData.bidSize; NxCoreKafkaL1DataNode::sendData(_manager, symbol, tosData.time, l1Data); NxCoreKafkaTosDataNode::sendData(_manager, symbol, tosData); // show a print every 5 sec for debug purposes /* std::time_t now = std::time(0); if (now > lastPrinted + 5) { lastPrinted = now; std::cout << "symbol - " << record.key().toString() << std::endl; std::cout << "price - " << tosData.price << std::endl; std::cout << "time - " << tosData.time << std::endl; std::cout << "size - " << tosData.size << std::endl; std::cout << "exchange - " << tosData.exchange << std::endl; std::cout << "form_t - " << tosData.formT << std::endl; std::cout << "updatesLast - " << tosData.updatesLast << std::endl; std::cout << "newPrint - " << tosData.newPrint << std::endl; std::cout << "todaysClose - " << tosData.todaysClose << std::endl; std::cout << "volume - " << tosData.volume << std::endl; std::cout << "open - " << tosData.open << std::endl; std::cout << "high - " << tosData.high << std::endl; std::cout << "low - " << tosData.low << std::endl; std::cout << "bid - " << l1Data.bidPrice << std::endl; std::cout << "bidSize - " << l1Data.bidSize << std::endl; std::cout << "bidExchange - " << l1Data.bidExchange << std::endl; std::cout << "ask - " << l1Data.askPrice << std::endl; std::cout << "askSize - " << l1Data.askSize << std::endl; std::cout << "askExchange - " << l1Data.askExchange << std::endl; std::cout << "tradeCondition - " << (int)kafkaData.tradeCondition << std::endl; std::cout << "conditionFlags - " << (int)kafkaData.conditionFlags << std::endl; std::cout << std::endl; } */ } else if (record.error()) { std::cerr << FLF << " "<< record.toString() << std::endl; } } } } NxCoreKafkaTosDataNode::VerboseDebug NxCoreKafkaTosDataNode::_verboseDebug = VerboseDebug::Unknown; bool NxCoreKafkaTosDataNode::requestAllTos() { static bool result = getConfigItem("request_all_tos") == "1"; return result; } NxCoreKafkaTosDataNode::Data::Data(std::string const &symbol, TosData const &tosData) : BroadcastMessage(getDataChannel(symbol), 0) { _currentTime = tosData.time; _data.price = tosData.price; _data.time = tosData.time; _data.size = tosData.size; _data.exchange = tosData.exchange; _data.todaysClose = tosData.todaysClose; _data.volume = tosData.volume; _data.open = tosData.open; _data.high = tosData.high; _data.low = tosData.low; _data.formT = tosData.formT; _data.updatesLast = tosData.updatesLast; _data.newPrint = true; } void NxCoreKafkaTosDataNode::sendData(DataNodeManager *manager, std::string const &symbol, TosData const &tosData) { // if (_verboseDebug == VerboseDebug::Yes) //ThreadMonitor::find().increment("TOS_sendData " + symbol); Data *message = new Data(symbol, tosData); // TclList msg; // msg<<__FILE__<<__LINE__<<__FUNCTION__ // <<"NxCoreKafkaTosDataNode"<getData().dump(); // sendToLogFile(msg); message->send(manager); } void NxCoreKafkaTosDataNode::sendInvalid(DataNodeManager *manager, std::string const &symbol) { BroadcastMessage *message = new BroadcastMessage(getInvalidChannel(symbol)); message->send(manager); } DataNodeLink *NxCoreKafkaTosDataNode::find(DataNodeListener *listener, int msgId, GenericTosDataNode *&node, std::string const &symbol) { NxCoreKafkaTosDataNode *tempNode; DataNodeLink *result = findHelper(listener, msgId, tempNode, symbol); node = tempNode; return result; } NxCoreKafkaTosDataNode::NxCoreKafkaTosDataNode(DataNodeArgument const &args) : GenericTosDataNode(args) { if (_verboseDebug == VerboseDebug::Unknown) { if (getConfigItem("kafka_data_verbose_debug") == "1") { _verboseDebug = VerboseDebug::Yes; sendToLogFile(TclList() << FLF << "Enabled _verboseDebug in TOS"); } else _verboseDebug = VerboseDebug::No; } registerForBroadcast(getDataChannel(symbol()), bmData); registerForBroadcast(getInvalidChannel(symbol()), bmInvalid); static std::vector toReport; // Invariant: If there is at least one item in toReport then we have // an outstanding call to doReport() waiting for an idle moment. If // there are NO items in toReport, then we do NOT have a call waiting. const auto doReport = []() { TclList msg; msg << FLF << "NxCoreKafkaTosDataNode()" << "Listening for" << toReport; sendToLogFile(msg); toReport.clear(); }; switch (toReport.size()) { case 0: // At this moment we're about to cross from 0 to 1 items in toReport, // so we need to schedule a call to doReport(). getManager()->addToQueue(doReport); break; case 1000: // There are already a lot of items waiting to be sent to the log. We // were about to add one more, but we don't want the list to get too // long. Immediately display the items that were already waiting, but // but queue up the new item for next time. That way the invariant will // still be true. doReport(); break; } toReport.push_back(symbol()); } void NxCoreKafkaTosDataNode::onBroadcast(BroadcastMessage &message, int msgId) { switch (msgId) { case bmData: { Data const &data = dynamic_cast(message); _valid = true; _last = data.getData(); if (data.getCurrentTime()) SynchronizedTimer::safeSetTime(data.getCurrentTime()); notifyListeners(); break; } case bmInvalid: if (_valid) { _valid = false; notifyListeners(); } break; default: GenericTosDataNode::onBroadcast(message, msgId); } } void initKafkaConsumer(DataNodeManager *manager) { new KafkaConsumerThread(manager); GenericTosDataNode::registerImplementation(NxCoreKafkaTosDataNode::find); GenericL1DataNode::registerImplementation(NxCoreKafkaL1DataNode::find); }