#include #include #include #include #include "../for_spryware/SpryWareFields.h" #include "../for_spryware/SpryWareSimple.h" #include "../../shared/NewConnections.h" #include "../../shared/GlobalConfigFile.h" #include "../../shared/CommandDispatcher.h" #include "../../shared/ReplyToClient.h" #include "MyPricePlusTable.h" // It seems that there is a limit to the number of symbols that a price plus // table can handle. After around 10,000 symbols it stops giving me data for // the new data. It does not produce any errors! This limit is not documented // anywhere. I split thing up, somewhere arbitrarily into 4 seperate tables. // I used hashing to spli things up because that was easy. If I knew the // real limit, I'm sure I could do a better job. But this appeared to work. // I got 18403 symbols loaded, and they all seemed to be working. static uint32_t hashCode(std::string const &symbol) { // Keep this simple. There isn't much we can do with a stock symbol. uint32_t accumulator = 0; int bitFrom = 1; int bitTo = 0x123000; for (std::string::const_iterator it = symbol.begin(); it != symbol.end(); it++) { accumulator *= 9; accumulator += *it; if (accumulator && bitFrom) accumulator ^= bitTo; bitFrom<<=1; bitTo>>=1; } accumulator += accumulator<<24; accumulator += (accumulator>>8) & 0xffff; return accumulator; } class MainRequestHandler : public RequestListener { private: enum { mtRequestStockData, mtShowList, mtCountList, mtAddList, mtRemoveList }; static const unsigned int TABLE_COUNT = 4; MisDirectory *_pDirectory; MyPricePlusTable _table[TABLE_COUNT]; std::set< std::string > _currentlySubscribed; bool getData(std::string const &symbol, Database::Record &record); public: MainRequestHandler(); ~MainRequestHandler(); void newRequest(Request *request); }; bool MainRequestHandler::getData(std::string const &symbol, Database::Record &record) { MyPricePlusTable &table = _table[hashCode(symbol) % TABLE_COUNT]; if (!_currentlySubscribed.count(symbol)) { // not in cache _currentlySubscribed.insert(symbol); table.addSymbolNow(symbol); } return table.getSnapshot(symbol, record); } MainRequestHandler::MainRequestHandler() { _pDirectory = CreateMisDirectory(); assert(_pDirectory); std::string configFile; VSS(_pDirectory->GetConfigFile(configFile)); VSS(_pDirectory->CheckDataDirectory()); for (unsigned i = 0; i < TABLE_COUNT; i++) VSS(_pDirectory->Open(_table[i], TABLE_PROCESSED_COMPOSITE, "", "", "", "")); CommandDispatcher::getInstance() ->listenForCommand("get_price", this, mtRequestStockData); CommandDispatcher::getInstance() ->listenForCommand("show_list", this, mtShowList); CommandDispatcher::getInstance() ->listenForCommand("count_list", this, mtCountList); CommandDispatcher::getInstance() ->listenForCommand("add_list", this, mtAddList); CommandDispatcher::getInstance() ->listenForCommand("remove_list", this, mtRemoveList); } MainRequestHandler::~MainRequestHandler() { for (unsigned i = 0; i < TABLE_COUNT; i++) _table[i].Close(); if (_pDirectory) delete _pDirectory; } static void reportValue(std::string &result, std::string const &name, std::string const &value) { if (!result.empty()) result += '&'; result += urlEncode(name); result += '='; result += urlEncode(value); } static void reportPricePrice(std::string &result, std::string const &name, Database::Record &record, unsigned int id) { const double price = getPricePrice(record, id); if (price) reportValue(result, name, priceToString(price)); } static void reportPriceTime(std::string &result, std::string const &name, Database::Record &record, unsigned int id) { const time_t time = getPriceTime(record, id); if (time) reportValue(result, name, ntoa(time)); } static void reportUInt32(std::string &result, std::string const &name, Database::Record &record, unsigned int id) { const int value = getFromUInt32(record, id); if (value) reportValue(result, name, ntoa(value)); } void MainRequestHandler::newRequest(Request *r) { switch (r->callbackId) { case mtRequestStockData: { ExternalRequest *request = dynamic_cast(r); std::string const &symbol = request->getProperty("symbol"); std::string reply = "symbol="; reply += urlEncode(symbol); Database::Record record; const bool success = getData(symbol, record); if (!success) reply += "&error=1"; else { reportPriceTime(reply, "last_time", record, MIS::FID::LAST); reportPricePrice(reply, "last_price", record, MIS::FID::LAST); reportUInt32(reply, "volume", record, MIS::FID::VOLUME); reportPricePrice(reply, "open", record, MIS::FID::OPEN); reportPricePrice(reply, "high", record, MIS::FID::HIGH); reportPricePrice(reply, "low", record, MIS::FID::LOW); reportPricePrice(reply, "close", record, MIS::FID::CLOSE); reportPricePrice(reply, "bid", record, MIS::FID::BID); reportUInt32(reply, "bid_size", record, MIS::FID::BID_SIZE); reportPricePrice(reply, "ask", record, MIS::FID::ASK); reportUInt32(reply, "ask_size", record, MIS::FID::ASK_SIZE); } addToOutputQueue(request->getSocketInfo(), reply, request->getResponseMessageId()); break; } case mtShowList: { ExternalRequest *request = dynamic_cast(r); std::string reply; for (std::set< std::string >::const_iterator it = _currentlySubscribed.begin(); it != _currentlySubscribed.end(); it++) { reply += *it; reply += ' '; } addToOutputQueue(request->getSocketInfo(), reply, request->getResponseMessageId()); break; } case mtCountList: { ExternalRequest *request = dynamic_cast(r); std::string reply = "Count = " + ntoa(_currentlySubscribed.size()); int count[TABLE_COUNT]; for (unsigned int i = 0; i < TABLE_COUNT; i++) count[i] = 0; for (std::set< std::string >::const_iterator it = _currentlySubscribed.begin(); it != _currentlySubscribed.end(); it++) { count[hashCode(*it) % TABLE_COUNT]++; } for (unsigned int i = 0; i < TABLE_COUNT; i++) { reply += ' '; reply += ntoa(count[i]); } addToOutputQueue(request->getSocketInfo(), reply, request->getResponseMessageId()); break; } case mtAddList: { ExternalRequest *request = dynamic_cast(r); const std::vector< std::string > requested = explode(",", request->getProperty("symbols")); int dupCount = 0; int addCount = 0; std::set< std::string > toAdd[TABLE_COUNT]; for (std::vector< std::string >::const_iterator it = requested.begin(); it != requested.end(); it++) if (_currentlySubscribed.count(*it)) dupCount++; else { _currentlySubscribed.insert(*it); toAdd[hashCode(*it)%TABLE_COUNT].insert(*it); addCount++; } for (unsigned i = 0; i < TABLE_COUNT; i++) if (toAdd[i].size()) _table[i].addSymbolList(toAdd[i]); addToOutputQueue(request->getSocketInfo(), "Added: " + ntoa(addCount) + ", Dups: " + ntoa(dupCount), request->getResponseMessageId()); //std::cout<<"Added: "<(r); const std::vector< std::string > requested = explode(",", request->getProperty("symbols")); int ignoredCount = 0; int deleteCount = 0; std::set< std::string > toRemove[TABLE_COUNT]; for (std::vector< std::string >::const_iterator it = requested.begin(); it != requested.end(); it++) if (!_currentlySubscribed.count(*it)) ignoredCount++; else { _currentlySubscribed.erase(*it); toRemove[hashCode(*it)%TABLE_COUNT].insert(*it); deleteCount++; } for (unsigned i = 0; i < TABLE_COUNT; i++) if (toRemove[i].size()) _table[i].deleteSymbolList(toRemove[i]); addToOutputQueue(request->getSocketInfo(), "Removed: " + ntoa(deleteCount) + ", Ignored: " + ntoa(ignoredCount), request->getResponseMessageId()); break; } } delete r; } int main(int argc, char *argv[]) { if (!addConfigItemsFromCommandLine(argv + 1)) { return 1; } configItemsComplete(); int listenPort = strtolDefault(getConfigItem("listen_port", "9229"), -1); if (listenPort == -1) { std::cerr<<"Invalid listen port \"" <getInput(), commandDispatcher->getInputCallbackId()); MainRequestHandler mainRequestHandler; // Start listening to new sockets now, after the other modules have // had time to configure themselves. NewConnections newConnections(&listener, listenPort); if (!newConnections.getSuccess()) { std::cerr<<"Unable to listen for new connections.\n"; return 3; } while (true) sleep(60); }