无锁环形缓冲(Wait-free ring buffer)?
boost提供了一种标准的所谓Wait-free ring buffer,官方文档上叫buffer,但是其实现其实还是queue,因为push和pop的时候没有size参数,就是说其内部是具体的元素对象,不是类似字节流那种闭环的buffer。
Wait-free ring buffer
class="cpp">#include <boost/atomic.hpp> template<typename T, size_t Size> class ringbuffer { public: ringbuffer() : head_(0), tail_(0) {} bool push(const T & value) { size_t head = head_.load(boost::memory_order_relaxed); size_t next_head = next(head); if (next_head == tail_.load(boost::memory_order_acquire)) return false; ring_[head] = value; head_.store(next_head, boost::memory_order_release); return true; } bool pop(T & value) { size_t tail = tail_.load(boost::memory_order_relaxed); if (tail == head_.load(boost::memory_order_acquire)) return false; value = ring_[tail]; tail_.store(next(tail), boost::memory_order_release); return true; } private: size_t next(size_t current) { return (current + 1) % Size; } T ring_[Size]; boost::atomic<size_t> head_, tail_; };
? 调用:
ringbuffer<int, 32> r; // try to insert an element if (r.push(42)) { /* succeeded */ } else { /* buffer full */ } // try to retrieve an element int value; if (r.pop(value)) { /* succeeded */ } else { /* buffer empty */ }
著名C++专家Herb Sutters对无锁问题的相关论述:
Volatile is orthogonal to what you use to implement atomics. In C++ it tells the compiler that certain it is not safe to perform optimizations with that variable. Herb Sutters lays it out:?
To safely write lock-free code that communicates between threads without using locks, prefer to use ordered atomic variables: Java/.NET volatile, C++0x atomic, and C-compatible atomic_T.
To safely communicate with special hardware or other memory that has unusual semantics, use unoptimizable variables: ISO C/C++ volatile. Remember that reads and writes of these variables are not necessarily atomic, however.
?Finally, to express a variable that both has unusual semantics and has any or all of the atomicity and/or ordering guarantees needed for lock-free coding, only the ISO C++0x draft Standard provides a direct way to spell it: volatile atomic.
下面这个例子是用volatile实现无锁的例子,这个仅限于特定硬件!如果要实现无锁,还不如参考linux kernel中的fifo,它连volatile都没用。
限制一个线程读,一个线程写,不加锁的队列,使用单链表实现,测试环境:centos 5.9?
#include <iostream> #include <pthread.h> template<class QElmType> struct qnode { struct qnode *next; QElmType data; }; template<class QElmType> class queue { public: queue() {init();} ~queue() {destroy();} bool init() { m_front = m_rear = new qnode<QElmType>; if (!m_front) return false; m_front->next = 0; return true; } void destroy() { while (m_front) { m_rear = m_front->next; delete m_front; m_front = m_rear; } } bool push(QElmType e) { struct qnode<QElmType> *p = new qnode<QElmType>; if (!p) return false; p->next = 0; m_rear->next = p; m_rear->data = e; m_rear = p; return true; } bool pop(QElmType *e) { if (m_front == m_rear) return false; struct qnode<QElmType> *p = m_front; *e = p->data; m_front = p->next; delete p; return true; } private: struct qnode<QElmType> * volatile m_front, * volatile m_rear; }; queue<int> g_q; void *thread1(void * l) { int i = 0; while (1) { g_q.push(i); i++; usleep(::rand()%1000); } return 0; } void *thread2(void * l) { int i; while (1) { if (g_q.pop(&i)) std::cout << i << std::endl; //else //std::cout << "can not pop" << std::endl; usleep(::rand()%1000); } return 0; } int main(int argc, char* argv[]) { pthread_t t1,t2; pthread_create(&t1, 0, thread1, 0); pthread_create(&t2, 0, thread2, 0); char ch; while (1) { std::cin >> ch; if (ch == 'q') break; } return 0; }
这个无锁队列主要是使用两个volatile 的指针来判断是否还有任务( volatile m_front, ?volatile m_rear)。
只能实现两个线程间的无锁队列,一个是工作者线程一个是提供任务线程,因为不能让两个或以上的线程来修改指针m_front 和指针m_rear,否者会出现问题。