一读一写无锁队列c++实现_C/C++_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > C/C++ > 一读一写无锁队列c++实现

一读一写无锁队列c++实现

 2016/5/12 5:32:33  aigo  程序员俱乐部  我要评论(0)
  • 摘要:相关文章:无锁环形缓冲(Wait-freeringbuffer)http://aigo.iteye.com/blog/1913518方案:用atomic实现boost提供了一种标准的所谓Wait-freeringbuffer,官方文档上叫buffer,但是其实现其实还是queue,因为push和pop的时候没有size参数,就是说其内部是具体的元素对象,不是类似字节流那种闭环的buffer。另外,他其实还是用到了轻量级的线程同步atomic,但比lock的开销少至少一个数量级
  • 标签:实现 c++ 队列

相关文章:

无锁环形缓冲(Wait-free ring buffer)?

http://aigo.iteye.com/blog/1913518

?

方案:用atomic实现

boost提供了一种标准的所谓Wait-free ring buffer官方文档上叫buffer,但是其实现其实还是queue,因为push和pop的时候没有size参数,就是说其内部是具体的元素对象,不是类似字节流那种闭环的buffer。

另外,他其实还是用到了轻量级的线程同步atomic,但比lock的开销少至少一个数量级。
下面例子是boost官方文档上的,这里只是提供了思路,没有boost也行,std::atomic也能实现

Wait-free ring buffer

http://www.boost.org/doc/libs/1_60_0/doc/html/atomic/usage_examples.html#boost_atomic.usage_examples.example_ringbuffer.implementation

实现:

?

class="cpp">#include <boost/atomic.hpp>

template<typename T, size_t Size>
class ringbuffer {
public:
  ringbuffer() : head_(0), tail_(0) {}

  bool push(const T & value)
  {
    size_t head = head_.load(boost::memory_order_relaxed);
    size_t next_head = next(head);
    if (next_head == tail_.load(boost::memory_order_acquire))
      return false;
    ring_[head] = value;
    head_.store(next_head, boost::memory_order_release);
    return true;
  }
  bool pop(T & value)
  {
    size_t tail = tail_.load(boost::memory_order_relaxed);
    if (tail == head_.load(boost::memory_order_acquire))
      return false;
    value = ring_[tail];
    tail_.store(next(tail), boost::memory_order_release);
    return true;
  }
private:
  size_t next(size_t current)
  {
    return (current + 1) % Size;
  }
  T ring_[Size];
  boost::atomic<size_t> head_, tail_;
};

? 调用:

?

ringbuffer<int, 32> r;

// try to insert an element
if (r.push(42)) { /* succeeded */ }
else { /* buffer full */ }

// try to retrieve an element
int value;
if (r.pop(value)) { /* succeeded */ }
else { /* buffer empty */ }

?

如果不需要无锁,只需要一个队列,还可以使用boost线程的API:circular_buffer

http://www.boost.org/doc/libs/1_60_0/doc/html/circular_buffer.html

?

?

?

误解:用volatile实现lock-free

之前网上不少关于用volatile来实现无锁的例子,实际上是错误
volatile无法实现线程安全,即使是一读一写。volatile设计的目的就不是为线程安全考虑的,在特殊的硬件和编译器上volatile才有数据同步的作用

著名C++专家Herb Sutters对无锁问题的相关论述:

原文:http://www.drdobbs.com/parallel/volatile-vs-volatile/212701484

Volatile is orthogonal to what you use to implement atomics. In C++ it tells the compiler that certain it is not safe to perform optimizations with that variable. Herb Sutters lays it out:?

To safely write lock-free code that communicates between threads without using locks, prefer to use ordered atomic variables: Java/.NET volatile, C++0x atomic, and C-compatible atomic_T.

?

To safely communicate with special hardware or other memory that has unusual semantics, use unoptimizable variables: ISO C/C++ volatile. Remember that reads and writes of these variables are not necessarily atomic, however.

?

?Finally, to express a variable that both has unusual semantics and has any or all of the atomicity and/or ordering guarantees needed for lock-free coding, only the ISO C++0x draft Standard provides a direct way to spell it: volatile atomic.

?

?

下面这个例子是用volatile实现无锁的例子,这个仅限于特定硬件!如果要实现无锁,还不如参考linux kernel中的fifo,它连volatile都没用。

CSDN上某博主的文章:http://blog.csdn.net/chenjiayi_yun/article/details/8945841?

限制一个线程读,一个线程写,不加锁的队列,使用单链表实现,测试环境:centos 5.9?

#include <iostream>
#include <pthread.h>


template<class QElmType>
struct qnode
{
    struct qnode *next;
    QElmType data;
};
template<class QElmType>
class queue
{
public:
        queue() {init();}
        ~queue() {destroy();}

        bool init()
        {
                m_front = m_rear = new qnode<QElmType>;
                if (!m_front)
                        return false;
                m_front->next = 0;
                return true;
        }
        void destroy()
        {
                while (m_front)
                {
                        m_rear = m_front->next;
                        delete m_front;
                        m_front = m_rear;
                }
        }
        bool push(QElmType e)
        {
                struct qnode<QElmType> *p = new qnode<QElmType>;
                if (!p)
                        return false;
                p->next = 0;
                m_rear->next = p;
                m_rear->data = e;
                m_rear = p;
                return true;
        }
        bool pop(QElmType *e)
        {
                if (m_front == m_rear)
                        return false;


                struct qnode<QElmType> *p = m_front;
                *e = p->data;
                m_front = p->next;
                delete p;
                return true;
        }
private:
  struct qnode<QElmType> * volatile m_front, * volatile m_rear;
};


queue<int> g_q;


void *thread1(void * l)
{
        int i = 0;
        while (1)
        {
                g_q.push(i);
                i++;
                usleep(::rand()%1000);
        }
        return 0;
}
void *thread2(void * l)
{
        int i;
        while (1)
        {
                if (g_q.pop(&i))
                        std::cout << i << std::endl;
                //else
                        //std::cout << "can not pop" << std::endl;
                usleep(::rand()%1000);
        }
        return 0;
}


int main(int argc, char* argv[])
{
        pthread_t t1,t2;
        pthread_create(&t1, 0, thread1, 0);
        pthread_create(&t2, 0, thread2, 0);
        char ch;
        while (1)
        {
                std::cin >> ch;
                if (ch == 'q')
                        break;
        }
       return 0;
}

?

代码比较简单,只实现2个线程间的无锁。

这个无锁队列主要是使用两个volatile 的指针来判断是否还有任务( volatile m_front, ?volatile m_rear)。

只能实现两个线程间的无锁队列,一个是工作者线程一个是提供任务线程,因为不能让两个或以上的线程来修改指针m_front 和指针m_rear,否者会出现问题。

这个无锁队列的实现是基于m_front指针的修改是由一个线程来完成的,m_rear的修改是由另一个线程来完成的,不会出现同时两个线程修改同一个指针

?

?

发表评论
用户名: 匿名