#include #include #include #include #include "HashHelper.h" #include "../shared/MiscSQL.h" #include "../shared/DatabaseForThread.h" #include "AutoSymbolList.h" /* Design: Someone requests data and it's a new request: client requests a list lock the master search for the item -- it doesn't exist. create the item. insert into into the by name list lock the new item unlock the master load the item. unlock the item. send a message to the symbol list thread to start monitoring this item. use the item. that includes re-locking it and possibly waiting. Someone requests data and it's a duplicate request: client requests a list lock the master search for the item -- found unlock the master use the item. that means locking it. that might mean a very small wait if someone else is using or updating it. that might mean a large wait if the initial update is still in progress. some data will be available before you unlock it. update one: grab the next item from the timer queue if we have the only reference to it delete it. that includes locking the master. else request new data from the database. lock the item. swap the new data with the old data unlock the item. put the item back into the timer queue. data structures: List data Contains a set (hash table, tree, or similar) of symbols protected by a mutex wrapped in a smart pointer Matches one strategy. Might include multiple lists including shared lists. List data description Says which lists from which owners. Always says the real owner, i.e. never say 0 to mean the current owner. Has enough information to go back to the database and refresh the data. Can be compared so we can use it as a key in a map. All Lists: A map from the description of each list to the corresponding data. Allows us to reuse list data. Saves memory Fewer requests to the database May avoid the initial trip to the database sometimes, so a strategy can start more quickly. Protected by a mutex. Timer queue some type of queue / heap / priority queue We can pop the next item when we have time to refresh its data. Does not directly own the data All Lists owns the data Putting an item in this queue does not increment it's reference count. Only accessed by one thread. */ ///////////////////////////////////////////////////////////////////// // Classes ///////////////////////////////////////////////////////////////////// class AutoSymbolListImpl : public AutoSymbolList { private: static pthread_mutex_t _globalMutex; typedef std::unordered_map< Description, Ref > All; static All _all; pthread_mutex_t _mutex; typedef CharStarHashSet Symbols; Symbols _symbols; AutoSymbolListImpl(); static void load(Description const &description, Symbols &destination); void reload(Description const &description); public: ~AutoSymbolListImpl(); virtual bool isPresent(char const *symbol); static Ref find(Description const &description); // Returns true if the item is still present at the end. False if it is not. static bool reloadOrRelease(Description const &description); static void debugUpdateAllNow(); }; ///////////////////////////////////////////////////////////////////// // AutoSymbolList::SingleList ///////////////////////////////////////////////////////////////////// bool AutoSymbolList::SingleList::operator ==(SingleList const &other) const { return (ownerId == other.ownerId) && (listId == other.listId); } bool AutoSymbolList::SingleList::operator !=(SingleList const &other) const { return (ownerId != other.ownerId) || (listId != other.listId); } bool AutoSymbolList::SingleList::operator <(SingleList const &other) const { return (ownerId < other.ownerId) || ((ownerId == other.ownerId) && (listId < other.listId)); } ///////////////////////////////////////////////////////////////////// // AutoSymbolList ///////////////////////////////////////////////////////////////////// void AutoSymbolList::debugUpdateAllNow() { AutoSymbolListImpl::debugUpdateAllNow(); } bool AutoSymbolList::reloadOrRelease(Description const &description) { return AutoSymbolListImpl::reloadOrRelease(description); } AutoSymbolList::Ref AutoSymbolList::find(Description const &description) { return AutoSymbolListImpl::find(description); } bool operator ==(AutoSymbolList::Description const &a, AutoSymbolList::Description const &b) { if (a.size() != b.size()) return false; for (AutoSymbolList::Description::const_iterator itA = a.begin(), itB = b.begin(); itA != a.end(); itA++, itB++) if (*itA != *itB) return false; return true; } inline uint64_t rotl (uint64_t x, int n) { // According to the internet, GCC will recognize this and replace it with // a single machine instruction. return (x<>(64-n)); } namespace std { template <> class hash< AutoSymbolList::Description > { public: uint64_t operator ()(AutoSymbolList::Description const &description) const { uint64_t result = 0; for (AutoSymbolList::Description::const_iterator it = description.begin(); it != description.end(); it++) { rotl(result, 3); uint64_t newValue = it->ownerId; newValue <<= 32; newValue |= it->listId; result ^= newValue; } return result; } }; } ///////////////////////////////////////////////////////////////////// // AutoSymbolListImpl ///////////////////////////////////////////////////////////////////// pthread_mutex_t AutoSymbolListImpl::_globalMutex = PTHREAD_MUTEX_INITIALIZER; AutoSymbolListImpl::All AutoSymbolListImpl::_all; void AutoSymbolListImpl::debugUpdateAllNow() { std::vector< Description > descriptions; pthread_mutex_lock(&_globalMutex); descriptions.reserve(_all.size()); for (All::const_iterator it = _all.begin(); it != _all.end(); it++) descriptions.push_back(it->first); pthread_mutex_unlock(&_globalMutex); for (auto it = descriptions.cbegin(); it != descriptions.cend(); it++) reloadOrRelease(*it); } bool AutoSymbolListImpl::reloadOrRelease(Description const &description) { AutoSymbolList::Ref toReload; pthread_mutex_lock(&_globalMutex); All::iterator it = _all.find(description); if (it != _all.end()) { if (it->second.isUnique()) // No one is using the list. Delete it. _all.erase(it); else // Need to update the data in the list. toReload = it->second; } pthread_mutex_unlock(&_globalMutex); if (toReload) dynamic_cast< AutoSymbolListImpl & >(*toReload).reload(description); return toReload; } static std::string s_load = "AutoSymbolListImpl::load"; void AutoSymbolListImpl::load(Description const &description, Symbols &destination) { // This structure comes from SymbolLists::inListsSql(). That was set // up specifically to match the indexes in symbols_in_lists. ThreadMonitor::SetState tm(s_load); tm.increment(s_load); destination.clear(); std::map< UserId, std::set< SymbolListId > > lists; for (Description::const_iterator it = description.begin(); it != description.end(); it++) lists[it->ownerId].insert(it->listId); std::vector< std::string > queries; for (std::map< UserId, std::set< SymbolListId > >::const_iterator it = lists.begin(); it != lists.end(); it++) { // For each user ... const UserId ownerId = it->first; std::set< SymbolListId > const &listsAsNumbers = it->second; std::vector< std::string > expressions; expressions.reserve(2); expressions.push_back("user_id=" + ntoa(ownerId)); std::set< std::string > listsAsStrings; for (std::set< SymbolListId >::const_iterator listIdIt = listsAsNumbers.begin(); listIdIt != listsAsNumbers.end(); listIdIt++) listsAsStrings.insert(ntoa(*listIdIt)); assert(!listsAsStrings.empty()); expressions.push_back(sqlIn("list_id", listsAsStrings)); const std::string query = "SELECT symbol FROM symbols_in_lists WHERE " + sqlAnd(expressions); queries.push_back(query); } const std::string sql = sqlUnion(queries); std::set< std::string > result; MysqlResultRef r = DatabaseForThread(DatabaseWithRetry::SLAVE) ->tryQueryUntilSuccess(sql, s_load); destination.reserve(r->numRows()); for (; r->rowIsValid(); r->nextRow()) addString(destination, r->getStringField(0)); } AutoSymbolListImpl::AutoSymbolListImpl() { int result = pthread_mutex_init(&_mutex, NULL); assert(!result); } AutoSymbolListImpl::~AutoSymbolListImpl() { int result = pthread_mutex_destroy(&_mutex); assert(!result); } bool AutoSymbolListImpl::isPresent(char const *symbol) { pthread_mutex_lock(&_mutex); const bool result = _symbols.count(symbol); pthread_mutex_unlock(&_mutex); return result; } void AutoSymbolListImpl::reload(Description const &description) { Symbols newList; load(description, newList); pthread_mutex_lock(&_mutex); _symbols.swap(newList); pthread_mutex_unlock(&_mutex); } // This name has to be explicit. This function will be executed in multiple // threads. static const std::string s_FIND_SYMBOL_LIST = "find_symbol_list"; AutoSymbolList::Ref AutoSymbolListImpl::find(Description const &description) { ThreadMonitor::SetState tm(s_FIND_SYMBOL_LIST); tm.increment(s_FIND_SYMBOL_LIST); AutoSymbolListImpl *isNew = NULL; pthread_mutex_lock(&_globalMutex); // Grab a REFERENCE to the data. That way we can check if the data already // exists, grab a copy if it does, or create and store a new copy if it // doesn't, all with only one lookup. AutoSymbolList::Ref &r = _all[description]; if (!r) { isNew = new AutoSymbolListImpl; pthread_mutex_lock(&isNew->_mutex); r = isNew; } // Copy the item from our table, and increment the item's reference count // before releasing the mutex. After we release the mutex somone could alter // _all and invalidate our iterator, or even delete the item we just grabbed. AutoSymbolList::Ref result = r; pthread_mutex_unlock(&_globalMutex); if (isNew) { load(description, isNew->_symbols); pthread_mutex_unlock(&isNew->_mutex); if (AutoSymbolListManager *manager = AutoSymbolListManager::instance()) manager->add(description); } return result; } ///////////////////////////////////////////////////////////////////// // AutoSymbolListManager ///////////////////////////////////////////////////////////////////// AutoSymbolListManager *AutoSymbolListManager::_instance = NULL;