TBB(
Intel Threading Building Blocks)学习笔记
并行与并发是相对的,OS里讲的是并发而在
架构方面更多的是说并行。并行是分多个层面的,个人认为基本上可以分为这么几个层面:1、指令级的并行;即所谓的微程序、指令流水线等,现在cpu的一级
缓存、二级缓存都很大,所以这个cache的效果还是
比较好的(基于局部性原理)2、
线程级的并行;即同一个时刻多个函数在运行(现在的cpu好像都是
多核的)3、服务级别的(比如一个游戏服务器中有商店服务、也有战斗服务、聊天服务等 这里的每个服务可能对应多个逻辑线程)4、节点级别的;即所谓的
分布式系统,多个节点互相配合,使整个系统在逻辑上成为一个单一的系统。(google、qq等这些海量访问的服务统统是分布式的)一般来说,第一个级别的并行直接做在硬件里面,第二个级别的并行会有一些基础的框架,第三和第四个级别的并行就是应用程序自己的架构的问题了。这里面实际上有一个争论:是在
算法并行化上面花心思去
研究还是采用分布式的框架来面对问题规模的增长?实际上2者各有利弊,前者可以充分利用已有硬件,但是对
程序员的要求较高,维护开发成本高,风险大;后者容易实现但是浪费硬件,在有些情况下不是所有问题都可以用加个机器的方式可以解决的(比如客户端上的多媒体软件,其计算量极大,总不能要求所有用户都升级吧。)
Intel Threading Building Blocks,是为了方便程序员使用多核
处理器的C++库,应该是对应上面提到的第二个级别的并行的。
一、TBB应该提供哪些东西?
用TBB就是为了程序的并行化,那么程序员需要
什么样的支持呢?最理想的情况是已有的代码不作任何修改,换一个编译器重新编译一下就OK,现在看来这个还不太现实。要有更好的效率就需要有更多的启发式信息,同时也就要求程序员要了解很多细节。整个程序逻辑没办法自动并行化,那就针对控制流进行并行化吧,所以TBB中提供了 parallel_for、parallel_while、 parallel_reduce等;(这些是TBB给C++程序员的比较高层的
接口)并行肯定是多线程,这样的话数据竞争问题就比较棘手,所以TBB提供并发容器;如果觉得
TBB提供的这些接口还没有办法解决性能问题,那就可以更深入的研究使用mutex、atomic、task等了;可以看出,TBB从几个层次上为程序员提供了支持。
二、TBB提供的接口
由底层到高层,task_scheduler--------concurrent_container--------parallel_for---pipeline
简单说,TBB帮我们调度一个个task(比OS的调度要高效),实现高效的并行算法
三、细节
1、parallel_for 适用场合:多个数据或请求彼此没有依赖关系,所要进行的操作是一样的(典型SPMD)
例子:
// 典型的c++
泛型编程 blocked_range 是要处理的多个数据,3个参数依次是开始的指针(迭代器)、结束指针、每个
任务分配的数据数
// parallel_forFibBody可以简单
理解为一个函数对象(c++里是用运算符
重载实现的,即()是通信的接口)
parallel_for( blocked_range<int>( 1, my_n, 10 ), parallel_forFibBody(my_stream) );
struct parallel_forFibBody {
QueueStream &my_stream;
//! fill functor arguments
parallel_forFibBody(QueueStream &s) : my_stream(s) { }
// 这里是并行的代码
void operator()( const blocked_range<int> &range ) const {
int i_end = range.end();
for( int i = range.begin(); i != i_end; ++i ) {
my_stream.Queue.push( Matrix1110 ); // push initial matrix
}
}
};
2、parallel_reduce 适合于需要汇总的情况,即各个数据的结果需要汇总回来
例子:(注意分发下去和汇总回来的方法)
float ParallelSumFoo( const float a[], size_t n ) {
SumFoo sf(a);
parallel_reduce(blocked_range<size_t>(0,n,IdealGrainSize), sf );
return sf.sum;
}
class SumFoo {
float* my_a;
public:
float sum;
void operator()( const blocked_range<size_t>& r ) {
float *a = my_a;
for( size_t i=r.begin(); i!=r.end(); ++i )
sum += Foo(a[i]);
}
SumFoo( SumFoo& x, split ) : my_a(x.my_a), sum(0) {} // 分发任务,注意这个
构造器要求是
线程安全的
void join( const SumFoo& y ) {sum+=y.sum;} // 收集汇总结果
SumFoo(float a[] ) :
my_a(a), sum(0)
{}
};
3、parallel_while 有时不知道循环何时结束,即使用for的end未知,在这种情况下可以使用parallel_while
例子:注意pop_if_present、typedef Item* argument_type、operator()等部分的处理
// 串行
版本
void SerialApplyFooToList( Item*root ) {
for( Item* ptr=root; ptr!=NULL; ptr=ptr->next )
Foo(pointer->data);
}
// 并行版本
class ItemStream {
Item* my_ptr;
public:
bool pop_if_present( Item*& item ) { // 用于提供下一个迭代器
if( my_ptr ) {
item = my_ptr;
my_ptr = my_ptr->next;
return true;
} else {
return false;
}
};
ItemStream( Item* root ) : my_ptr(root) {}
}
class ApplyFoo {
public:
void operator()( Item* item ) const { // 要求一定是const的
Foo(item->data);
}
typedef Item* argument_type; // 此句是必须的
};
void ParallelApplyFooToList( Item*root ) {
// parallel_while是个class
parallel_while<ApplyFoo> w; // 先建立个对象
ItemStream stream;
ApplyFoo body;
// 第一个参数提供数据指针,第二个参数提供函数体
w.run( stream, body );
}
四、并发容器
大部分程序都有容器类,在多线程环境下就有数据污染的问题,为了使并发的线程串行化,一般是使用加锁的办法,如果这个
容器由程序员自己来实现,难度还是比较大的,这样就需要有线程安全的容器类。
1、concurrent_
hash_map
hash接口与stl类似
2、concurrent_vector
grow_by(n) 插入n个item(动态增长)
grow_to_at_least()设定容器的大小
size() 包括正在并发增长的部分 因为有可能会同时取,所以程序员需要自己维护自己的class的线程
安全性
clear() 不是线程安全的
3、concurrent_queue
pop_if_present(item) 非阻塞,
pop() 阻塞,
concurrent_queue::size() 负数时表示有多少个
消费者在等待
set_capacity()指定
队列大小,会使push操作被阻塞
在并行时,paralell_while pipeline 的效率要高于concurrent_queue
五、如果觉得TBB的加锁效率不高,可以自己控制锁
最常用的是spin lock
六、整个TBB引擎的核心是 Task Scheduler(基于任务图来实现)
提高效率的核心是
threading stealing,保证cpu的效率
七、小结
要使用TBB进行并行化,首先程序员要知道哪些是可以并行化;其次,要熟悉TBB并行化的框架(主要是
泛型编程);再次,程序员要大概知道
并行算法的执行步骤;最后,利用TBB的组件,实现并行化的算法。总体上来说,还是不太好用的