Java 7 引入了一个特殊的 executor 使用 Fork/Join 框架。Fork/Join 框架用来解决那些能使用分治算法解决的问题
?
为了使用分治算法,你必须把一个问题分解成小问题。使用递归方法来重复相同的操作直到问题被细分为能直接被解决的足够小的问题。这些小问题可以用 executor 来解决,但是为了更有效地解决这些小问题,Java 7 引入了 Fork/Join 框架。
这个框架基于 ForkJoinPool 类,它是一个特殊的 executor,它包含有两个操作 fork() 和 join()? (以及它们的变种形式),以及一个名为工作窃取算法的内部算法。
?
使用此框架时,你的主方法代码类似于:
class="java">if ( problem.size() > DEFAULT_SIZE) { divideTasks(); executeTask(); taskResults=joinTasksResult(); return taskResults; } else { taskResults=solveBasicProblem(); return taskResults; }
?代码中最重要的部分是允许你有效地分解并执行子任务并从这些子任务中获得运行结果用来计算父任务的结果。这个功能由 ForkJoinTask 的以下两个方法提供:
这两个方法还有其它不同的变形。Fork/Join 框架还有一个关键的特性:决定哪些任务被执行的工作窃取算法。当一个任务使用 join() 方法等待一个子任务完成时,执行此任务的线程会从任务池中取不同的任务运行。通过此方法,Fork/Join executor 的线程总是在执行任务,从而提高了应用的性能。
?
Java 8 提供了 Fork/Join 框架的一个新特性。现在每个 Java 应用程序都有一个默认的名为 ForkJoinPool 的通用池。你可以通过调用静态方法 ForkJoinPool.commonPool() 来获得。默认情况下这个默认的 Fork/Join executor 会被使用,executor 里线程数由运行计算机的可用处理器数来决定。你可以通过改变操作系统值 java.util.concurrent.ForkJoinPool.common.parallelism 来改变其默认行为。
?
一些 Java API 本身也使用 Fork/Join 框架来实现并行操作。例如用并发方式来对数组排序的 Arrays 类的 parallelSort() 方法,以及 Java 8 提供的 parallel streams。
?
Fork/Join 框架具有以下局限性:
Fork/Join 框架有以下五个基础类:
?
public class ForkJoinDemo { public static void main(String[] args) { int[] ints = IntStream.range(1, 5).toArray(); ForkJoinPool forkJoinPool = new ForkJoinPool(4); SumTask sumTask = new SumTask(ints, 0, ints.length, 1); long startTime = System.currentTimeMillis(); int sum = forkJoinPool.invoke(sumTask); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + sum + " in " + (endTime - startTime) + " ms."); forkJoinPool.shutdown(); try { forkJoinPool.awaitTermination(20, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } } class SumTask extends RecursiveTask<Integer> { private int[] dataList; private int from; private int end; private int divideFactor; public SumTask(int[] dataList, int from, int end, int divideFactor) { this.dataList = dataList; this.from = from; this.end = end; this.divideFactor = divideFactor; } @Override protected Integer compute() { if ((end - from) <= divideFactor) { int sum = 0; for (int i = from; i < end; i++) { sum += dataList[i]; } try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println(String.format("%s : compute %d~%d = %d", Thread.currentThread().getName(), from, end, sum)); return sum; } else { int middle = (end + from) / 2; System.out.println(String.format("%s : split %d~%d ==> %d~%d, %d~%d", Thread.currentThread().getName(), from, end, from, middle, middle, end)); SumTask task1 = new SumTask(dataList, from, middle, divideFactor); SumTask task2 = new SumTask(dataList, middle, end, divideFactor); // 这里我们使用 invokeAll() 方法,如果使用 task1.fork() 和 task2.fork() 方法 // 会导致当前线程变成监工,不参与子任务的执行,从而降低性能 invokeAll(task1, task2); int sum1 = task1.join(); int sum2 = task2.join(); int sum = sum1 + sum2; System.out.println(Thread.currentThread().getName() + " : " + "result = " + sum1 + " + " + sum2 + " ==> " + sum); return sum1 + sum2; } } }
?
ForkJoinPool 类提供了 execute(),invoke(),submit() 方法来向线程池中提交任务。它们之间的区别是:
我们知道 ForkJoinPool 处理基于 ForkJoinTask 类的任务,但是也能执行基于 Runnable 和 Callable 接口的任务。你可以使用 submit() 方法,该方法可以接收一个 Runnable 对象,一个有返回值的 Runnable 对象,以及一个 Callable 对象。
?
ForkJoinTask: