#include #include "../../shared/ThreadClass.h" #include "../../shared/CommandDispatcher.h" #include "../../shared/ReplyToClient.h" #include "DataNodes.h" #include "GenericDataNodes.h" #include "../../shared/TwoDLookup.h" #include "DataNodesTest.h" enum { mtDataNodeEvent, mtRequestSpread, mtSetBid, mtSetAsk, mtMiscTests, mtQuit }; class NewDoubleValue : public BroadcastMessage { private: double _value; bool _valid; public: NewDoubleValue(std::string channel, double value, bool valid); static NewDoubleValue *bid(std::string symbol, double value); static NewDoubleValue *ask(std::string symbol, double value); static NewDoubleValue *bid(std::string symbol); static NewDoubleValue *ask(std::string symbol); double getValue() const { return _value; } bool getValid() const { return _valid; } }; NewDoubleValue::NewDoubleValue(std::string channel, double value, bool valid) : BroadcastMessage(channel), _value(value), _valid(valid) { } NewDoubleValue *NewDoubleValue::bid(std::string symbol, double value) { // This is bad! "." is used in stock symbols. We need to find a better // seperator. return new NewDoubleValue("BidDataNode." + symbol, value, true); } NewDoubleValue *NewDoubleValue::ask(std::string symbol, double value) { return new NewDoubleValue("AskDataNode." + symbol, value, true); } NewDoubleValue *NewDoubleValue::bid(std::string symbol) { return new NewDoubleValue("BidDataNode." + symbol, 0.0, false); } NewDoubleValue *NewDoubleValue::ask(std::string symbol) { return new NewDoubleValue("AskDataNode." + symbol, 0.0, false); } // This is a little silly. I'm trying to have two different classes, but // they are both so simple and similar that I'm using a base case. If the // requirements really looked like this, I'd have one class and the name of the // channel could ome from the arguments in the factory. A more realistic bid // and ask would both be very simple and would listen to the consolidated L1 // data node and would not save anything. There would be no need for a shared // base case. If I was really planning to write a lot of code like this, I'd // change the routines for creating a facotry. In particular, I'd expose the // constructor for GenericDataNode. class SimpleDoubleRepeater : public GenericDataNode { private: double _value; bool _valid; void onBroadcast(BroadcastMessage &message, int msgId); enum { bmNewValue }; protected: SimpleDoubleRepeater(std::string const &channel, DataNodeArgument const &args); public: void getDouble(bool &valid, double &value) const { valid = _valid; value = _value; } ~SimpleDoubleRepeater(); }; SimpleDoubleRepeater::SimpleDoubleRepeater(std::string const &channel, DataNodeArgument const &args) : _value(0.0), _valid(false) { const std::string fullChannel = channel + "." + args.getStringValue(); registerForBroadcast(fullChannel, bmNewValue); std::cout<<"Creating "<(message); _value = m.getValue(); _valid = m.getValid(); notifyListeners(); } //else // GenericDataNode::onBroadcast() // that seems like overkill. if we want to send a message to all generic // data nodes, GenericDataNode can choose to implement a new method. } class BidDataNode : public SimpleDoubleRepeater { private: BidDataNode(DataNodeArgument const &args) : SimpleDoubleRepeater("BidDataNode", args) { } friend class GenericDataNodeFactory; }; class AskDataNode : public SimpleDoubleRepeater { private: AskDataNode(DataNodeArgument const &args) : SimpleDoubleRepeater("AskDataNode", args) { } friend class GenericDataNodeFactory; }; class SpreadDataNode : public GenericDataNode { private: SpreadDataNode(DataNodeArgument const &symbol); friend class GenericDataNodeFactory; BidDataNode *_bid; GenericDataNode *_ask; public: void getDouble(bool &valid, double &value); }; SpreadDataNode::SpreadDataNode(DataNodeArgument const &symbol) { // This is the easiest way to create a generic data node if we don't always // have a factory. addAutoLink(findGeneric(this, 0, _bid, symbol)); // This is an alternate way, which might be a little more flexible. GenericDataNodeFactory *factory = GenericDataNodeFactory::create< AskDataNode >(symbol); addAutoLink(factory->find(this, 0, _ask)); delete factory; } void SpreadDataNode::getDouble(bool &valid, double &value) { double bid, ask; _bid->getDouble(valid, bid); if (!valid) { return; } _ask->getDouble(valid, ask); if (!valid) { return; } value = ask - bid; } class Alert : private DataNodeListener { private: std::string _symbol; SocketInfo *_socket; SpreadDataNode *_spread; DataNodeLink *_link; public: void onWakeup(int msgId); Alert(std::string const &symbol, SocketInfo *socket); ~Alert(); }; void Alert::onWakeup(int msgId) { double value; bool valid; _spread->getDouble(valid, value); std::string msg = "Symbol = \"" + _symbol + "\", spread "; if (valid) { msg += "= " + priceToString(value); } else { msg += "is invalid"; } addToOutputQueue(_socket, msg); } Alert::Alert(std::string const &symbol, SocketInfo *socket) : _symbol(symbol), _socket(socket), _link(findGeneric(this, 0, _spread, symbol)) { } Alert::~Alert() { std::cout<<"Alert::~Alert()\n"; _link->release(); } class DataNodesTestThread : private ThreadClass { private: RequestQueue _incoming; protected: void threadFunction(); public: void miscTests(ExternalRequest *request); DataNodesTestThread(); ~DataNodesTestThread(); }; DataNodesTestThread::DataNodesTestThread() : ThreadClass("DataNodesTestThread"), _incoming("DataNodesTestThread") { std::cout<<"DataNodesTestThread::DataNodesTestThread()\n"; CommandDispatcher::getInstance()->listenForCommand("test_request_spread", &_incoming, mtRequestSpread); CommandDispatcher::getInstance()->listenForCommand("test_set_bid", &_incoming, mtSetBid); CommandDispatcher::getInstance()->listenForCommand("test_set_ask", &_incoming, mtSetAsk); CommandDispatcher::getInstance()->listenForCommand("test_datanode", &_incoming, mtMiscTests); startThread(); } DataNodesTestThread::~DataNodesTestThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void DataNodesTestThread::miscTests(ExternalRequest *request) { SocketInfo *socket = request->getSocketInfo(); DataNodeArgument one = 1; DataNodeArgument abc = "abc"; DataNodeArgumentVector l; l.push_back(1); l.push_back("two"); l.push_back(3.0); DataNodeArgument list = l; DataNodeArgumentVector forward; forward.push_back(new DataNodeArgumentPlaceHolder("one")); forward.push_back(new DataNodeArgumentPlaceHolder("two")); forward.push_back(new DataNodeArgumentPlaceHolder("three")); DataNodeArgumentVector backward; backward.push_back(new DataNodeArgumentPlaceHolder("three")); backward.push_back(new DataNodeArgumentPlaceHolder("two")); backward.push_back(new DataNodeArgumentPlaceHolder("one")); DataNodeArgumentVector all; all.push_back(forward); all.push_back(backward); all.push_back(all); DataNodeArgument a = all; addToOutputQueue(socket, "one: " + one.asString() + "\nabc: " + abc.asString() + "\nlist: " + list.asString() + "\nall: " + a.asString()); DataNodeArgument b = a.replace("one", 1.0); b = b.replace("two", "II"); b = b.replace("three", 3); addToOutputQueue(socket, "a: " + a.asString() + "\nb: " + b.asString()); TwoDArray array; array.add("1", "English", "one"); array.add("2", "English", "two"); array.add("3", "English", "three"); array.add("1", "Bigger", "one hundred one"); array.add("2", "Bigger", "two hundred two"); array.add("1", "comma", "1,001"); array.add("3", "comma", "3,003"); array.add("1", "quotes", "\""); array.add("2", "quotes", "\"\""); array.add("3", "quotes", "\"\"\""); std::string filename = request->getProperty("o_file"); if (filename.empty()) { filename = "output.csv"; } array.writeToCSV(filename); array.loadFromCSV("copy_from.csv"); array.writeToCSV("copy_to.csv"); } void freeAlerts(std::vector< Alert * > &alerts) { for (std::vector< Alert * >::iterator it = alerts.begin(); it != alerts.end(); it++) { delete *it; } } void DataNodesTestThread::threadFunction() { std::cout<<"DataNodesTestThread::threadFunction()\n"; std::map< SocketInfo *, std::vector< Alert * > > allAlerts; // We have to implement this here, rather than in the constructor, so we // will be in the right thread. Otherwise DataNodeManager::getDefault() // would not work right! DataNodeManager dataNodeManager(&_incoming, mtDataNodeEvent); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtDataNodeEvent: { DataNodeManager::EventQueueListener *request = dynamic_cast< DataNodeManager::EventQueueListener * >(current); request->onEventQueue(); break; } case mtRequestSpread: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); SocketInfo *socket = request->getSocketInfo(); Alert *alert = new Alert(request->getProperty("symbol"), socket); allAlerts[socket].push_back(alert); break; } case mtSetBid: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); const std::string symbol = request->getProperty("symbol"); const double value = strtodDefault(request->getProperty("new_value"), -1); BroadcastMessage *outgoing; if (value > 0) { outgoing = NewDoubleValue::bid(symbol, value); } else { outgoing = NewDoubleValue::bid(symbol); } outgoing->send(&dataNodeManager); break; } case mtSetAsk: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); const std::string symbol = request->getProperty("symbol"); const double value = strtodDefault(request->getProperty("new_value"), -1); BroadcastMessage *outgoing; if (value > 0) { outgoing = NewDoubleValue::ask(symbol, value); } else { outgoing = NewDoubleValue::ask(symbol); } outgoing->send(&dataNodeManager); break; } case mtMiscTests: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); miscTests(request); break; } case mtQuit: { delete current; // We should really delete everyting in allAlerts. return; } case DeleteSocketThread::callbackId: { SocketInfo *socket = current->getSocketInfo(); std::vector< Alert * > &v = allAlerts[socket]; std::cout<<"DeleteSocketThread::callbackId "<