Java SE5的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和执行任务之间提供了一个间接层,Executor代替客户端执行任务。Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期。Executor在Java SE5/6中时启动任务的优选方法。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等
class="magplus" src="/Upload/Images/2015031121/5C1FB164B402DCDD.png" title="点击查看原始大小图片" width="700" style="cursor: , pointer;">
创建线程池
Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
?
public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
?
见类图,接口Executor只有一个方法execute,接口ExecutorService扩展了Executor并添加了一些生命周期管理的方法,如shutdown、submit等。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止。
?
Callable,Future用于返回结果
Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。
实例:并行计算求和
?
Java代码??
- public?class?ConcurrentSum?{??
- ????private?int?coreCpuNum;??
- ????private?ExecutorService??executor;??
- ????private?List<FutureTask<Long>>?tasks?=?new?ArrayList<FutureTask<Long>>();??
- ????public?ConcurrentSum(){??
- ????????coreCpuNum?=?Runtime.getRuntime().availableProcessors();??
- ????????executor?=?Executors.newFixedThreadPool(coreCpuNum);??
- ????}??
- ????class?SumCalculator?implements?Callable<Long>{??
- ????????int?nums[];??
- ????????int?start;??
- ????????int?end;??
- ????????public?SumCalculator(final?int?nums[],int?start,int?end){??
- ????????????this.nums?=?nums;??
- ????????????this.start?=?start;??
- ????????????this.end?=?end;??
- ????????}??
- ????????@Override??
- ????????public?Long?call()?throws?Exception?{??
- ????????????long?sum?=0;??
- ????????????for(int?i=start;i<end;i++){??
- ????????????????sum?+=?nums[i];??
- ????????????}??
- ????????????return?sum;??
- ????????}??
- ????}??
- ????public?long?sum(int[]?nums){??
- ????????int?start,end,increment;??
- ??????????
- ????????for(int?i=0;i<coreCpuNum;i++){??
- ????????????increment?=?nums.length?/?coreCpuNum+1;??
- ????????????start?=?i*increment;??
- ????????????end?=?start+increment;??
- ????????????if(end?>?nums.length){??
- ????????????????end?=?nums.length;???
- ????????????}??
- ????????????SumCalculator?calculator?=?new?SumCalculator(nums,?start,?end);??
- ????????????FutureTask<Long>?task?=?new?FutureTask<Long>(calculator);??
- ????????????tasks.add(task);??
- ????????????if(!executor.isShutdown()){??
- ????????????????executor.submit(task);??
- ????????????}??
- ????????}??
- ????????return?getPartSum();??
- ????}??
- ????public?long?getPartSum(){??
- ????????long?sum?=?0;??
- ????????for(int?i=0;i<tasks.size();i++){??
- ????????????try?{??
- ????????????????sum?+=?tasks.get(i).get();??
- ????????????}?catch?(InterruptedException?e)?{??
- ????????????????e.printStackTrace();??
- ????????????}?catch?(ExecutionException?e)?{??
- ????????????????e.printStackTrace();??
- ????????????}??
- ????????}??
- ????????return?sum;??
- ????}??
- ????public?void?close(){??
- ????????executor.shutdown();??
- ????}??
- ??????
- ????public?static?void?main(String[]?args)?{??
- ????????int?arr[]?=?new?int[]{1,?22,?33,?4,?52,?61,?7,?48,?10,?11?};??
- ????????long?sum?=?new?ConcurrentSum().sum(arr);??
- ????????System.out.println("sum:?"?+?sum);??
- ????}??
- }??
?
CompletionService
在上述例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使用CompletionService。
它与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。
实例:并行计算求和
?
Java代码??
monospace; font-size: 12px; line-height: 25.2000007629395px; margin-bottom: 1px; padding-top: 2px; padding-bottom: 2px; border: 1px solid #d1d7dc; color: #2b91af;">
- public?class?ConcurrentSum2?{??
- ????private?int?coreCpuNum;??
- ????private?ExecutorService??executor;??
- ????private?CompletionService<Long>?completionService;??
- ??????
- ????public?ConcurrentSum2(){??
- ??????????
- ????}??
- ????class?SumCalculator?implements?Callable<Long>{??
- ??????????
- ????}??
- ????public?long?sum(int[]?nums){??
- ????????int?start,end,increment;??
- ??????????
- ????????for(int?i=0;i<coreCpuNum;i++){??
- ????????????increment?=?nums.length?/?coreCpuNum+1;??
- ????????????start?=?i*increment;??
- ????????????end?=?start+increment;??
- ????????????if(end?>?nums.length){??
- ????????????????end?=?nums.length;???
- ????????????}??
- ????????????SumCalculator?task?=?new?SumCalculator(nums,?start,?end);??
- ????????????if(!executor.isShutdown()){??
- ????????????????completionService.submit(task);??
- ????????????}??
- ????????}??
- ????????return?getPartSum();??
- ????}??
- ????public?long?getPartSum(){??
- ????????long?sum?=?0;??
- ????????for(int?i=0;i<coreCpuNum;i++){??
- ????????????try?{??
- ????????????????sum?+=?completionService.take().get();??
- ????????????}?catch?(InterruptedException?e)?{??
- ????????????????e.printStackTrace();??
- ????????????}?catch?(ExecutionException?e)?{??
- ????????????????e.printStackTrace();??
- ????????????}??
- ????????}??
- ????????return?sum;??
- ????}??
- ????public?void?close(){??
- ????????executor.shutdown();??
- ????}??
- } ?
?
转自:http://willsunforjava.iteye.com/blog/1631353