【转】Java并发框架Executor学习笔记_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 【转】Java并发框架Executor学习笔记

【转】Java并发框架Executor学习笔记

 2015/3/11 21:16:11  RoomFourteen224  程序员俱乐部  我要评论(0)
  • 摘要:JavaSE5的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和执行任务之间提供了一个间接层,Executor代替客户端执行任务。Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期。Executor在JavaSE5/6中时启动任务的优选方法。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程池,Executor,Executors
  • 标签:笔记 学习 Java 学习笔记 Java并发

Java SE5的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和执行任务之间提供了一个间接层,Executor代替客户端执行任务。Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期。Executor在Java SE5/6中时启动任务的优选方法。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等


class="magplus" src="/Upload/Images/2015031121/5C1FB164B402DCDD.png" title="点击查看原始大小图片" width="700" style="cursor: , pointer;">

创建线程池

Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口

?

public static ExecutorService newFixedThreadPool(int nThreads)

创建固定数目线程的线程池。

public static ExecutorService newCachedThreadPool()

创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

public static ExecutorService newSingleThreadExecutor()

创建一个单线程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

?

见类图,接口Executor只有一个方法execute,接口ExecutorService扩展了Executor并添加了一些生命周期管理的方法,如shutdown、submit等。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止。

?

Callable,Future用于返回结果

Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。

实例:并行计算求和

?

Java代码??收藏代码
  1. public?class?ConcurrentSum?{??
  2. ????private?int?coreCpuNum;??
  3. ????private?ExecutorService??executor;??
  4. ????private?List<FutureTask<Long>>?tasks?=?new?ArrayList<FutureTask<Long>>();??
  5. ????public?ConcurrentSum(){??
  6. ????????coreCpuNum?=?Runtime.getRuntime().availableProcessors();??
  7. ????????executor?=?Executors.newFixedThreadPool(coreCpuNum);??
  8. ????}??
  9. ????class?SumCalculator?implements?Callable<Long>{??
  10. ????????int?nums[];??
  11. ????????int?start;??
  12. ????????int?end;??
  13. ????????public?SumCalculator(final?int?nums[],int?start,int?end){??
  14. ????????????this.nums?=?nums;??
  15. ????????????this.start?=?start;??
  16. ????????????this.end?=?end;??
  17. ????????}??
  18. ????????@Override??
  19. ????????public?Long?call()?throws?Exception?{??
  20. ????????????long?sum?=0;??
  21. ????????????for(int?i=start;i<end;i++){??
  22. ????????????????sum?+=?nums[i];??
  23. ????????????}??
  24. ????????????return?sum;??
  25. ????????}??
  26. ????}??
  27. ????public?long?sum(int[]?nums){??
  28. ????????int?start,end,increment;??
  29. ????????//?根据CPU核心个数拆分任务,创建FutureTask并提交到Executor???
  30. ????????for(int?i=0;i<coreCpuNum;i++){??
  31. ????????????increment?=?nums.length?/?coreCpuNum+1;??
  32. ????????????start?=?i*increment;??
  33. ????????????end?=?start+increment;??
  34. ????????????if(end?>?nums.length){??
  35. ????????????????end?=?nums.length;???
  36. ????????????}??
  37. ????????????SumCalculator?calculator?=?new?SumCalculator(nums,?start,?end);??
  38. ????????????FutureTask<Long>?task?=?new?FutureTask<Long>(calculator);??
  39. ????????????tasks.add(task);??
  40. ????????????if(!executor.isShutdown()){??
  41. ????????????????executor.submit(task);??
  42. ????????????}??
  43. ????????}??
  44. ????????return?getPartSum();??
  45. ????}??
  46. ????public?long?getPartSum(){??
  47. ????????long?sum?=?0;??
  48. ????????for(int?i=0;i<tasks.size();i++){??
  49. ????????????try?{??
  50. ????????????????sum?+=?tasks.get(i).get();??
  51. ????????????}?catch?(InterruptedException?e)?{??
  52. ????????????????e.printStackTrace();??
  53. ????????????}?catch?(ExecutionException?e)?{??
  54. ????????????????e.printStackTrace();??
  55. ????????????}??
  56. ????????}??
  57. ????????return?sum;??
  58. ????}??
  59. ????public?void?close(){??
  60. ????????executor.shutdown();??
  61. ????}??
  62. ??????
  63. ????public?static?void?main(String[]?args)?{??
  64. ????????int?arr[]?=?new?int[]{1,?22,?33,?4,?52,?61,?7,?48,?10,?11?};??
  65. ????????long?sum?=?new?ConcurrentSum().sum(arr);??
  66. ????????System.out.println("sum:?"?+?sum);??
  67. ????}??
  68. }??

?

CompletionService

在上述例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使用CompletionService。

它与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。

实例:并行计算求和

?

Java代码??收藏代码
    monospace; font-size: 12px; line-height: 25.2000007629395px; margin-bottom: 1px; padding-top: 2px; padding-bottom: 2px; border: 1px solid #d1d7dc; color: #2b91af;">
  1. public?class?ConcurrentSum2?{??
  2. ????private?int?coreCpuNum;??
  3. ????private?ExecutorService??executor;??
  4. ????private?CompletionService<Long>?completionService;??
  5. ??????
  6. ????public?ConcurrentSum2(){??
  7. ????????//.....??
  8. ????}??
  9. ????class?SumCalculator?implements?Callable<Long>{??
  10. ????????//.....??
  11. ????}??
  12. ????public?long?sum(int[]?nums){??
  13. ????????int?start,end,increment;??
  14. ????????//?根据CPU核心个数拆分任务,创建FutureTask并提交到Executor???
  15. ????????for(int?i=0;i<coreCpuNum;i++){??
  16. ????????????increment?=?nums.length?/?coreCpuNum+1;??
  17. ????????????start?=?i*increment;??
  18. ????????????end?=?start+increment;??
  19. ????????????if(end?>?nums.length){??
  20. ????????????????end?=?nums.length;???
  21. ????????????}??
  22. ????????????SumCalculator?task?=?new?SumCalculator(nums,?start,?end);??
  23. ????????????if(!executor.isShutdown()){??
  24. ????????????????completionService.submit(task);??
  25. ????????????}??
  26. ????????}??
  27. ????????return?getPartSum();??
  28. ????}??
  29. ????public?long?getPartSum(){??
  30. ????????long?sum?=?0;??
  31. ????????for(int?i=0;i<coreCpuNum;i++){??
  32. ????????????try?{??
  33. ????????????????sum?+=?completionService.take().get();??
  34. ????????????}?catch?(InterruptedException?e)?{??
  35. ????????????????e.printStackTrace();??
  36. ????????????}?catch?(ExecutionException?e)?{??
  37. ????????????????e.printStackTrace();??
  38. ????????????}??
  39. ????????}??
  40. ????????return?sum;??
  41. ????}??
  42. ????public?void?close(){??
  43. ????????executor.shutdown();??
  44. ????}??
  45. } ?

?

转自:http://willsunforjava.iteye.com/blog/1631353

发表评论
用户名: 匿名