相关文章:
无锁环形缓冲(Wait-free ring buffer)?
http://aigo.iteye.com/blog/1913518
?
方案:用atomic实现
boost提供了一种标准的所谓Wait-free ring buffer,官方文档上叫buffer,但是其实现其实还是queue,因为push和pop的时候没有size参数,就是说其内部是具体的元素对象,不是类似字节流那种闭环的buffer。
另外,他其实还是用到了轻量级的线程同步atomic,但比lock的开销少至少一个数量级。
下面例子是boost官方文档上的,这里只是提供了思路,没有boost也行,std::atomic也能实现
Wait-free ring buffer
http://www.boost.org/doc/libs/1_60_0/doc/html/atomic/usage_examples.html#boost_atomic.usage_examples.example_ringbuffer.implementation
实现:
?
class="cpp">#include <boost/atomic.hpp> template<typename T, size_t Size> class ringbuffer { public: ringbuffer() : head_(0), tail_(0) {} bool push(const T & value) { size_t head = head_.load(boost::memory_order_relaxed); size_t next_head = next(head); if (next_head == tail_.load(boost::memory_order_acquire)) return false; ring_[head] = value; head_.store(next_head, boost::memory_order_release); return true; } bool pop(T & value) { size_t tail = tail_.load(boost::memory_order_relaxed); if (tail == head_.load(boost::memory_order_acquire)) return false; value = ring_[tail]; tail_.store(next(tail), boost::memory_order_release); return true; } private: size_t next(size_t current) { return (current + 1) % Size; } T ring_[Size]; boost::atomic<size_t> head_, tail_; };
? 调用:
?
ringbuffer<int, 32> r; // try to insert an element if (r.push(42)) { /* succeeded */ } else { /* buffer full */ } // try to retrieve an element int value; if (r.pop(value)) { /* succeeded */ } else { /* buffer empty */ }
?
如果不需要无锁,只需要一个队列,还可以使用boost线程的API:circular_buffer
http://www.boost.org/doc/libs/1_60_0/doc/html/circular_buffer.html
?
?
?
误解:用volatile实现lock-free
之前网上不少关于用volatile来实现无锁的例子,实际上是错误的。
volatile无法实现线程安全,即使是一读一写。volatile设计的目的就不是为线程安全考虑的,在特殊的硬件和编译器上volatile才有数据同步的作用。
著名C++专家Herb Sutters对无锁问题的相关论述:
原文:http://www.drdobbs.com/parallel/volatile-vs-volatile/212701484
Volatile is orthogonal to what you use to implement atomics. In C++ it tells the compiler that certain it is not safe to perform optimizations with that variable. Herb Sutters lays it out:?
To safely write lock-free code that communicates between threads without using locks, prefer to use ordered atomic variables: Java/.NET volatile, C++0x atomic, and C-compatible atomic_T.
?
To safely communicate with special hardware or other memory that has unusual semantics, use unoptimizable variables: ISO C/C++ volatile. Remember that reads and writes of these variables are not necessarily atomic, however.
?
?Finally, to express a variable that both has unusual semantics and has any or all of the atomicity and/or ordering guarantees needed for lock-free coding, only the ISO C++0x draft Standard provides a direct way to spell it: volatile atomic.
?
?
下面这个例子是用volatile实现无锁的例子,这个仅限于特定硬件!如果要实现无锁,还不如参考linux kernel中的fifo,它连volatile都没用。
CSDN上某博主的文章:http://blog.csdn.net/chenjiayi_yun/article/details/8945841?
限制一个线程读,一个线程写,不加锁的队列,使用单链表实现,测试环境:centos 5.9?
#include <iostream> #include <pthread.h> template<class QElmType> struct qnode { struct qnode *next; QElmType data; }; template<class QElmType> class queue { public: queue() {init();} ~queue() {destroy();} bool init() { m_front = m_rear = new qnode<QElmType>; if (!m_front) return false; m_front->next = 0; return true; } void destroy() { while (m_front) { m_rear = m_front->next; delete m_front; m_front = m_rear; } } bool push(QElmType e) { struct qnode<QElmType> *p = new qnode<QElmType>; if (!p) return false; p->next = 0; m_rear->next = p; m_rear->data = e; m_rear = p; return true; } bool pop(QElmType *e) { if (m_front == m_rear) return false; struct qnode<QElmType> *p = m_front; *e = p->data; m_front = p->next; delete p; return true; } private: struct qnode<QElmType> * volatile m_front, * volatile m_rear; }; queue<int> g_q; void *thread1(void * l) { int i = 0; while (1) { g_q.push(i); i++; usleep(::rand()%1000); } return 0; } void *thread2(void * l) { int i; while (1) { if (g_q.pop(&i)) std::cout << i << std::endl; //else //std::cout << "can not pop" << std::endl; usleep(::rand()%1000); } return 0; } int main(int argc, char* argv[]) { pthread_t t1,t2; pthread_create(&t1, 0, thread1, 0); pthread_create(&t2, 0, thread2, 0); char ch; while (1) { std::cin >> ch; if (ch == 'q') break; } return 0; }
?
代码比较简单,只实现2个线程间的无锁。
这个无锁队列主要是使用两个volatile 的指针来判断是否还有任务( volatile m_front, ?volatile m_rear)。
只能实现两个线程间的无锁队列,一个是工作者线程一个是提供任务线程,因为不能让两个或以上的线程来修改指针m_front 和指针m_rear,否者会出现问题。
这个无锁队列的实现是基于m_front指针的修改是由一个线程来完成的,m_rear的修改是由另一个线程来完成的,不会出现同时两个线程修改同一个指针。
?
?