#ifndef __AtomicAccumulate_h_ #define __AtomicAccumulate_h_ #include // TODO The problem is that when you grab something you immediately say // that you are ready to receive new messages. That's not necessarily // true. Example: // . You have a thread pool reading from the AtomicAccumulate // . The order of the messages is important. // . Thread 1 reads some messages and starts processing them. // . New data arrives and a new notification is sent. // . Thread 2 reads the new messages and starts processing them. // . Thread 2 updates a global object (e.g. it sends a messge to the client.) // . Thread 1 updates the same global object. // . The two updates were done out of order. // . You want a way to suspend updates until you are done with this update. // Suggested solution: // . You have a special pointer that you can store in _stack. // . Something like (Node *)-1 // . This means the stack is empty, but in a special state. // . When you go to add to the stack, you always look for this special state. // . After the add _stack will point to the new Node. // . The new Node's next will point to NULL, not this special value. // . We will *not* notify anyone that there is new data. // . When the reader is done working it needs to change _stack back to NULL. // . Use a compare and swap. // . If _stack was not pointing to the special value, read the new data. /* This is a nice way to share data between threads. You can use it like a * stack or a queue. This is complete thread safe and completely lock free. */ template < class T > class AtomicAccumulate { private: // Node provides a simple linked list. Each item contains a value of type // T and a pointer to the next item. struct Node { T value; Node *next; template < class... Args > Node(Args&&... args) : value(std::forward(args)...), next(NULL) { } ~Node() { delete next; } }; Node *_stack; // We store the data in a stack because it's so easy. Node *take() { // This complicated looking function is implemented with the XCHG // operation. That's a simple exchange which is always atomic. We return // the previous value of _stack, and leave NULL in its place. return __sync_lock_test_and_set(&_stack, NULL); } template< class Action > void takeFIFO(Node *p, Action const &action) { if (p) { takeFIFO(p->next, action); action(std::move(p->value)); } } void operator =(AtomicAccumulate const &) = delete; AtomicAccumulate(AtomicAccumulate const &) = delete; public: AtomicAccumulate() : _stack(NULL) { } ~AtomicAccumulate() { delete _stack; } // Push an item of type T onto the stack. We pass any number of arguments // to T's constructor. In most cases T will include a copy or move // constructor, so you could pass an object of type T. template < class... Args > bool emplace(Args&&... args) { Node *newTop = new Node(std::forward(args)...); bool previouslyEmpty; do { newTop->next = _stack; previouslyEmpty = !newTop->next; } while (!__sync_bool_compare_and_swap(&_stack, newTop->next, newTop)); return previouslyEmpty; } // Read the items in reverse order, like a typical stack. // Note that we move all of the items out of the variable immediately, // before calling action on the first one. template< class Action > void takeLIFO(Action const &action) { Node *p = take(); const std::unique_ptr< Node > exceptionSafeCleanup(p); while (p) { action(std::move(p->value)); p = p->next; } } // Read the items in the same order they were received in, a typical queue. // Note that we move all of the items out of the variable immediately, // before calling action on the first one. template< class Action > void takeFIFO(Action const &action) { Node *p = take(); const std::unique_ptr< Node > exceptionSafeCleanup(p); takeFIFO(p, action); } void clear() { delete take(); } }; #endif