相关文章:
无锁环形缓冲(Wait-free ring buffer)?
http://aigo.iteye.com/blog/1913518
?
方案:用atomic实现
boost提供了一种标准的所谓Wait-free ring buffer,官方文档上叫buffer,但是其实现其实还是queue,因为push和pop的时候没有size参数,就是说其内部是具体的元素对象,不是类似字节流那种闭环的buffer。
另外,他其实还是用到了轻量级的线程同步atomic,但比lock的开销少至少一个数量级。
下面例子是boost官方文档上的,这里只是提供了思路,没有boost也行,std::atomic也能实现
Wait-free ring buffer
http://www.boost.org/doc/libs/1_60_0/doc/html/atomic/usage_examples.html#boost_atomic.usage_examples.example_ringbuffer.implementation
实现:
?
class="cpp">#include <boost/atomic.hpp>
template<typename T, size_t Size>
class ringbuffer {
public:
ringbuffer() : head_(0), tail_(0) {}
bool push(const T & value)
{
size_t head = head_.load(boost::memory_order_relaxed);
size_t next_head = next(head);
if (next_head == tail_.load(boost::memory_order_acquire))
return false;
ring_[head] = value;
head_.store(next_head, boost::memory_order_release);
return true;
}
bool pop(T & value)
{
size_t tail = tail_.load(boost::memory_order_relaxed);
if (tail == head_.load(boost::memory_order_acquire))
return false;
value = ring_[tail];
tail_.store(next(tail), boost::memory_order_release);
return true;
}
private:
size_t next(size_t current)
{
return (current + 1) % Size;
}
T ring_[Size];
boost::atomic<size_t> head_, tail_;
};
? 调用:
?
ringbuffer<int, 32> r;
// try to insert an element
if (r.push(42)) { /* succeeded */ }
else { /* buffer full */ }
// try to retrieve an element
int value;
if (r.pop(value)) { /* succeeded */ }
else { /* buffer empty */ }
?
如果不需要无锁,只需要一个队列,还可以使用boost线程的API:circular_buffer
http://www.boost.org/doc/libs/1_60_0/doc/html/circular_buffer.html
?
?
?
误解:用volatile实现lock-free
之前网上不少关于用volatile来实现无锁的例子,实际上是错误的。
volatile无法实现线程安全,即使是一读一写。volatile设计的目的就不是为线程安全考虑的,在特殊的硬件和编译器上volatile才有数据同步的作用。
著名C++专家Herb Sutters对无锁问题的相关论述:
原文:http://www.drdobbs.com/parallel/volatile-vs-volatile/212701484
Volatile is orthogonal to what you use to implement atomics. In C++ it tells the compiler that certain it is not safe to perform optimizations with that variable. Herb Sutters lays it out:?
To safely write lock-free code that communicates between threads without using locks, prefer to use ordered atomic variables: Java/.NET volatile, C++0x atomic, and C-compatible atomic_T.
?
To safely communicate with special hardware or other memory that has unusual semantics, use unoptimizable variables: ISO C/C++ volatile. Remember that reads and writes of these variables are not necessarily atomic, however.
?
?Finally, to express a variable that both has unusual semantics and has any or all of the atomicity and/or ordering guarantees needed for lock-free coding, only the ISO C++0x draft Standard provides a direct way to spell it: volatile atomic.
?
?
下面这个例子是用volatile实现无锁的例子,这个仅限于特定硬件!如果要实现无锁,还不如参考linux kernel中的fifo,它连volatile都没用。
CSDN上某博主的文章:http://blog.csdn.net/chenjiayi_yun/article/details/8945841?
限制一个线程读,一个线程写,不加锁的队列,使用单链表实现,测试环境:centos 5.9?
#include <iostream>
#include <pthread.h>
template<class QElmType>
struct qnode
{
struct qnode *next;
QElmType data;
};
template<class QElmType>
class queue
{
public:
queue() {init();}
~queue() {destroy();}
bool init()
{
m_front = m_rear = new qnode<QElmType>;
if (!m_front)
return false;
m_front->next = 0;
return true;
}
void destroy()
{
while (m_front)
{
m_rear = m_front->next;
delete m_front;
m_front = m_rear;
}
}
bool push(QElmType e)
{
struct qnode<QElmType> *p = new qnode<QElmType>;
if (!p)
return false;
p->next = 0;
m_rear->next = p;
m_rear->data = e;
m_rear = p;
return true;
}
bool pop(QElmType *e)
{
if (m_front == m_rear)
return false;
struct qnode<QElmType> *p = m_front;
*e = p->data;
m_front = p->next;
delete p;
return true;
}
private:
struct qnode<QElmType> * volatile m_front, * volatile m_rear;
};
queue<int> g_q;
void *thread1(void * l)
{
int i = 0;
while (1)
{
g_q.push(i);
i++;
usleep(::rand()%1000);
}
return 0;
}
void *thread2(void * l)
{
int i;
while (1)
{
if (g_q.pop(&i))
std::cout << i << std::endl;
//else
//std::cout << "can not pop" << std::endl;
usleep(::rand()%1000);
}
return 0;
}
int main(int argc, char* argv[])
{
pthread_t t1,t2;
pthread_create(&t1, 0, thread1, 0);
pthread_create(&t2, 0, thread2, 0);
char ch;
while (1)
{
std::cin >> ch;
if (ch == 'q')
break;
}
return 0;
}
?
代码比较简单,只实现2个线程间的无锁。
这个无锁队列主要是使用两个volatile 的指针来判断是否还有任务( volatile m_front, ?volatile m_rear)。
只能实现两个线程间的无锁队列,一个是工作者线程一个是提供任务线程,因为不能让两个或以上的线程来修改指针m_front 和指针m_rear,否者会出现问题。
这个无锁队列的实现是基于m_front指针的修改是由一个线程来完成的,m_rear的修改是由另一个线程来完成的,不会出现同时两个线程修改同一个指针。
?
?