#include #include #include "../../shared/SimpleLogFile.h" #include "Timers.h" #include "DataNodes.h" ///////////////////////////////////////////////////////////////////// // BroadcastRequest ///////////////////////////////////////////////////////////////////// bool BroadcastRequest::operator ==(BroadcastRequest const &other) const { return (_id == other._id) && (_caller == other._caller) && (_channel == other._channel); } bool BroadcastRequest::operator <(BroadcastRequest const &other) const { // Put them in order first by the channel. The expected use is for a // broadcaster to name the channel, and then we have to find all of the // listeners on that channel. In any other use, we'd have all 3 pieces of // data, so the order wouldn't matter much. int stringCompare = _channel.compare(other._channel); if (stringCompare < 0) { return true; } if (stringCompare > 0) { return false; } if (_caller < other._caller) { return true; } if (_caller > other._caller) { return false; } return _id < other._id; } ///////////////////////////////////////////////////////////////////// // GracefulShutdown ///////////////////////////////////////////////////////////////////// class GracefulShutdown : public BroadcastMessage { public: GracefulShutdown(); ~GracefulShutdown(); }; GracefulShutdown::GracefulShutdown() : BroadcastMessage(DataNodeManager::gracefulShutdownChannel()) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"Graceful shutdown started"; sendToLogFile(msg); } GracefulShutdown::~GracefulShutdown() { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"Graceful shutdown complete"; sendToLogFile(msg); scheduleSyncrhonizedShutdown(); // Lock up this thread until the log file thread shuts us down correctly. while (true) sleep(10000); } ///////////////////////////////////////////////////////////////////// // DataNodeManager ///////////////////////////////////////////////////////////////////// void DataNodeManager::gracefulShutdown() { // This is really aimed at the SpryWare code. Most code is happy if we just // call exit() or die on a signal. (new GracefulShutdown)->send(this); } void DataNodeManager::addToQueue(EventQueueListener *listener) { listener->callbackId = _callbackId; _queue->newRequest(listener); } void DataNodeManager::addToQueue(std::function< void() > const &action, SocketInfo *socket) { class Work : public DataNodeManager::EventQueueListener { private: std::function< void() > _action; public: Work(SocketInfo *socket, std::function< void() > const &action) : DataNodeManager::EventQueueListener(socket), _action(action) { } virtual void onEventQueue() { _action(); } }; addToQueue(new Work(socket, action)); } void DataNodeManager::subscribe(BroadcastRequest request) { // The old Delphi API made a big deal that you couldn't submit a duplicate // request. I'm not sure why not. Presumably that was a detail of the // Delphi implementation. In this implementation we silently ignore any // duplicates. _broadcastRequests.insert(request); } void DataNodeManager::unsubscribe(BroadcastRequest request) { _broadcastRequests.erase(request); } void DataNodeManager::broadcastNow(BroadcastMessage *message) { // We copy the items that we intend to call. Then we check each one before // sending it to make sure that it hasn't been deleted. This is not very // effecient, but it makes sure we do a good job even if the list is changing // during our callbacks. std::set< BroadcastRequest > toSend; for (std::set< BroadcastRequest >::const_iterator it = _broadcastRequests.lower_bound (BroadcastRequest(message->getChannel(), NULL, std::numeric_limits< int >::min())); (it != _broadcastRequests.end()) && (it->_channel == message->getChannel()); it++) { toSend.insert(*it); } for (std::set< BroadcastRequest >::const_iterator it = toSend.begin(); it != toSend.end(); it++) { if (_broadcastRequests.count(*it)) { it->_caller->onBroadcast(*message, it->_id); } } } __thread DataNodeManager *dataNodeMangerInstance = NULL; __thread int64_t nextDataNodeManagerChannelId = 0; DataNodeManager *DataNodeManager::getDefault() { return dataNodeMangerInstance; } DataNodeManager::DataNodeManager(RequestListener *queue, int callbackId) : _queue(queue), _callbackId(callbackId), _timerThread(this) { } void DataNodeManager::setAsDefault() { dataNodeMangerInstance = this; } std::string DataNodeManager::nextOwnerChannel() { return "owner." + ntoa(nextDataNodeManagerChannelId++); } int64_t DataNodeManager::dataNodeCount() { return nextDataNodeManagerChannelId; } void DataNodeManager::registerNode(std::string const &name, DataNode *node) { assert(!_registered.count(name)); assert(!_registeredByNode.count(node)); _registered[name] = node; _registeredByNode[node] = name; } void DataNodeManager::unregisterNode(DataNode *node) { _registered.erase(_registeredByNode[node]); _registeredByNode.erase(node); } DataNode *DataNodeManager::findDataNode(std::string const &name) { return getPropertyDefault(_registered, name); } std::string DataNodeManager::getDebugInfo(DataNode const *node) const { // return getPropertyDefault(_registeredByNode, node); // We should be able to use the line above. But getting the "const" just // right through several layers of temple functions would be a mess. return getPropertyDefault(_registeredByNode, const_cast< DataNode * >(node)); } ///////////////////////////////////////////////////////////////////// // BroadcastMessage ///////////////////////////////////////////////////////////////////// void BroadcastMessage::onEventQueue() { _dataNodeManager->setSubmitTime(_submitTime); _dataNodeManager->broadcastNow(this); // We expect the event loop to delete us after this. } void BroadcastMessage::send(DataNodeManager *dataNodeManager) { _dataNodeManager = dataNodeManager; dataNodeManager->addToQueue(this); } ///////////////////////////////////////////////////////////////////// // DataNode ///////////////////////////////////////////////////////////////////// void DataNode::addLink(DataNodeLink *link) { if (link->_listener) { _listeners.insert(link); } else { _otherOwners++; } } bool DataNode::readyForRelease() { return _listeners.empty() && !_otherOwners; } void DataNode::removeLink(DataNodeLink *link) { if (link->_listener) { _listeners.erase(link); } else { _otherOwners--; } if (readyForRelease()) { delete this; } } void DataNode::notifyListeners() { // This uses the same strategy as DataNodeManager::broadcastNow(). We copy // all the items up front. Then we iterate through our own list. // Immediately before calling each callback, we make sure it is still on // the main list. That way we are safe even if people make arbitrary changes // to the list of callbacks during a callback. std::set< DataNodeLink * > copyOfListeners = _listeners; for (std::set< DataNodeLink * >::iterator it = copyOfListeners.begin(); it != copyOfListeners.end(); it++) { if (_listeners.count(*it)) { (*it)->notifyListener(); } } } void DataNode::registerForBroadcast(std::string channel, int msgId) { BroadcastRequest request(channel, this, msgId); if (_broadcastRequests.insert(request).second) { // New request _manager->subscribe(request); } } void DataNode::unregisterForBroadcast(std::string channel, int msgId) { BroadcastRequest request(channel, this, msgId); _broadcastRequests.erase(request); _manager->unsubscribe(request); } DataNode::DataNode() : _otherOwners(0), _manager(DataNodeManager::getDefault()), _ownerChannel(_manager->nextOwnerChannel()) { } DataNodeLink *DataNode::createLink(DataNodeListener *listener, int msgId) { DataNodeLink *result = new DataNodeLink(this, listener, msgId); addLink(result); return result; } DataNode::~DataNode() { _manager->unregisterNode(this); for (std::set< BroadcastRequest >::const_iterator it = _broadcastRequests.begin(); it != _broadcastRequests.end(); it++) { _manager->unsubscribe(*it); } for (std::vector< DataNodeLink * >::iterator it = _automaticLinks.begin(); it != _automaticLinks.end(); it++) { (*it)->release(); } } std::string DataNode::getDebugInfo() const { return _manager->getDebugInfo(this); } ///////////////////////////////////////////////////////////////////// // DataNodeLink ///////////////////////////////////////////////////////////////////// void DataNodeLink::notifyListener() { if (_listener) { // The Delphi code catches exceptions here and reports messages to the // log. _listener->onWakeup(_msgId); } } void DataNodeLink::release() { _source->removeLink(this); delete this; } ///////////////////////////////////////////////////////////////////// // SocketClosedDataNode ///////////////////////////////////////////////////////////////////// __thread SocketClosedDataNode *SocketClosedDataNode::instance = NULL; SocketClosedDataNode::SocketClosedDataNode(DataNodeArgument const &args) : _socket(NULL) { assert(!instance); instance = this; } SocketClosedDataNode::~SocketClosedDataNode() { assert(instance); instance = NULL; } void SocketClosedDataNode::doNotify(SocketInfo *socket) { if (instance) { instance->_socket = socket; instance->notifyListeners(); instance->_socket = NULL; } } DataNodeLink *SocketClosedDataNode::find(DataNodeListener *listener, int msgId, SocketClosedDataNode *&node) { return findHelper(listener, msgId, node, DataNodeArgument()); } ///////////////////////////////////////////////////////////////////// // DataNodeArgumentBase ///////////////////////////////////////////////////////////////////// std::string DataNodeArgumentBase::quoteString(std::string value) { // Print the number of characters before the list of characters. This way // we know where the end is. return ntoa(value.size()) + ';' + value; } DataNodeArgument DataNodeArgumentBase::replace (std::string const &name, DataNodeArgument const &value) const { // The default is to return NULL to say no change. return DataNodeArgument(); } ///////////////////////////////////////////////////////////////////// // DataNodeArgument ///////////////////////////////////////////////////////////////////// DataNodeArgument DataNodeArgument::replace (std::string const &name, DataNodeArgument const &value) const { // Our implementation of replace would not work right if you tried to fill // in a place holder will NULL. assert(value); DataNodeArgument newValue = _value->replace(name, value); if (newValue) { return newValue; } else { return *this; } } DataNodeArgument::DataNodeArgument(DataNode::Integer value) : _value(new DataNodeArgumentInt(value)) { } DataNodeArgument::DataNodeArgument(int value) : _value(new DataNodeArgumentInt(value)) { } DataNodeArgument::DataNodeArgument(bool value) : _value(new DataNodeArgumentInt(value?1:0)) { } DataNodeArgument::DataNodeArgument(std::string value) : _value(new DataNodeArgumentString(value)) { } DataNodeArgument::DataNodeArgument(char const *value) : _value(new DataNodeArgumentString(value)) { } DataNodeArgument::DataNodeArgument(double value) : _value(new DataNodeArgumentDouble(value)) { } DataNodeArgument::DataNodeArgument(DataNodeArgumentVector const &value) : _value(new DataNodeArgumentList(value)) { } DataNodeArgument::DataNodeArgument() : _value(NULL) { } DataNodeArgument::DataNodeArgument(DataNodeArgumentBase *value) : _value(value) { } std::string DataNodeArgument::asString() const { if (_value) { return _value->asString(); } else { // THE PREFIX N IS RESERVED HERE. return "N"; // N for NULL. } } ///////////////////////////////////////////////////////////////////// // DataNodeArgumentInt ///////////////////////////////////////////////////////////////////// std::string DataNodeArgumentInt::asString() const { // THE PREFIX I IS RESERVED HERE. return 'I' + ntoa(_value) + ';'; } ///////////////////////////////////////////////////////////////////// // DataNodeArgumentString ///////////////////////////////////////////////////////////////////// std::string DataNodeArgumentString::asString() const { // THE PREFIX S IS RESERVED HERE. return 'S' + quoteString(_value); } ///////////////////////////////////////////////////////////////////// // DataNodeArgumentDouble ///////////////////////////////////////////////////////////////////// std::string DataNodeArgumentDouble::asString() const { // Ideally we should use the binary form, to be sure that small differences // are not missed. // THE PREFIX D IS RESERVED HERE. return 'D' + ntoa(_value) + ';'; } ///////////////////////////////////////////////////////////////////// // DataNodeArgumentPlaceHolder ///////////////////////////////////////////////////////////////////// std::string DataNodeArgumentPlaceHolder::asString() const { // Perhaps this should be an error. Do you really want to use a factory // as a key when the factory is not complete? // THE PREFIX P IS RESERVED HERE. return 'P' + quoteString(_name); } DataNodeArgument DataNodeArgumentPlaceHolder::replace(std::string const &name, DataNodeArgument const &value) const { if (name == _name) { // Do replacement. return value; } else { // Keep as is. return DataNodeArgument(); } } ///////////////////////////////////////////////////////////////////// // DataNodeArgumentList ///////////////////////////////////////////////////////////////////// std::string DataNodeArgumentList::asString() const { // We should probably cache this value. In fact, it might make sense to // cache all of the asString values, not just for the list type. It might // also make sense to append to a string rather than returning a string. // THE PREFIX L IS RESERVED HERE. std::string result = 'L' + ntoa(_value.size()) + ';'; for (DataNodeArgumentVector::const_iterator it = _value.begin(); it != _value.end(); it++) { // Notice that that there is no seperator between the string forms of // the items. The assumption is that each item knows where it's own // end it. So not only are the string forms unique, but no valid string // form can be a prefix of another valid string form! result += it->asString(); } return result; } DataNodeArgument DataNodeArgumentList::replace(std::string const &name, DataNodeArgument const &value) const { // Call replace recursively on each of the underlying items. If none of // them reports a change, then we report no change for the entire list. // Otherwise we create a new list merging copies of the unchanged items with // the newly created replacements. DataNodeArgumentVector newItems; newItems.reserve(_value.size()); for (DataNodeArgumentVector::const_iterator it = _value.begin(); it != _value.end(); it++) { newItems.push_back(it->replace(name, value)); } assert(_value.size() == newItems.size()); bool changeFound = false; for (DataNodeArgumentVector::size_type i = 0; i < _value.size(); i++) { if (newItems[i]) { // This is not perfect. According to this scheme, you can't replace // something with NULL. If you try, it will look like no change // was made! I think that's okay. I don't think we used NULL // anywere except in the implementaiton of the DataNodes module. // I don't think any client modules use NULL. changeFound = true; } else { // The item was unchanged, so copy it. We're not yet sure if we // need it. We might abandon the copy. But we're just changing a // smart pointer, so that shouldn't be very expensive. newItems[i] = _value[i]; } } if (changeFound) { return newItems; } else { // None of the parts needed replacing, so the list as a whole does not // need replacing. return DataNodeArgument(); } } /* Here are some generic notes about retrieving a class name at run time. These are aimed at properly creating unique keys for data nodes. Classname G++ allows us to get a classname from the RTTI. It attempts to make this unique. However, it's far from perfect. In general if you have two classes with the same name in the same name space, the class name will be the same. It will usually do a decent job with namespaces and other scopes. A::B will have a different name than C::B. If we pretend that the classes will all be accessible from the same header file, and our names are sufficiently unique for that to work, then the RTTI class names will be unique. If you have static functions in two different files with the same name, and you define a class in each function with the same name, the RTTI class name will be identical even though the classes are not. This case is perfectly legal, so it seems like a bug in g++. A more realistic example works like this. Two different .C files declare classes with the same name. Neither class is exported to a header file. But objects of these types are exported. In this case two objects will have different types, but the rtti class names will be identical. Strictly speaking, I'm not sure if this is legal, but it's easy to do and the compiler will not give you an error or a warning. Looking at the type_info structure or the address of the class name doesn't help. These will fail in exactly the same cases. The vtable actually works in these cases. It's a shame I can't explicitly ask for a pointer to the vtable. I don't know if dynamic_cast works or not in these cases. If it did, that would help us find some of these problems. */