【译】使用Java 8中的并行Streams前请三思_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 【译】使用Java 8中的并行Streams前请三思

【译】使用Java 8中的并行Streams前请三思

 2014/4/21 15:32:02  nifoo  程序员俱乐部  我要评论(0)
  • 摘要:原文:http://java.dzone.com/articles/think-twice-using-java-8作者:LukasKrecan翻译:长风如果你去听Oracle的人谈论Java8背后的设计抉择的话,你经常会听到他们说并行化是其主要的动机。并行化是lambdas、streamAPI及其它一些技术的背后驱动力。让我看一个streamAPI的例子:privatelongcountPrimes(intmax){returnrange(1,max).parallel().filter
  • 标签:使用 Java
原文:http://java.dzone.com/articles/think-twice-using-java-8
作者:Lukas Krecan
翻译:长风

如果你去听Oracle的人谈论 Java 8 背后的设计抉择的话,你经常会听到他们说并行化是其主要的动机。并行化是lambdas、stream API及其它一些技术的背后驱动力。让我看一个stream API的例子
class="java" name="code">private long countPrimes(int max) {
    return range(1, max).parallel().filter(this::isPrime).count();
}

private boolean isPrime(long n) {
    return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}


这里我们有一个方法 countPrimes,可以计算1到max之间有多少个素数。这个方法里面,首先通过range方法创建一个数字的 stream 对象,然后将这个 stream 转换成并行模式,接着那些不是素数的数字会被过滤掉,最后剩下的素数被计数。

你可以看到,stream API 允许我们以一种简洁和紧凑的方式去描述问题。而且,你只要调用一个 parallel() 方法就能实现并行化。并行化以后,stream 会被分成多个块,每块都会被独立处理,最后的结果又会被聚合在一起。我们上面那个 isPrime 方法的实现即低效,又是CPU密集型的,因此,我们可以利用并行化的优点,使用所有能用的 CPU core。

下面我们来看另一个例子:
private List<StockInfo> getStockInfo(Stream<String> symbols) {
     return symbols.parallel()
            .map(this::getStockInfo) //slow network operation
            .collect(toList());
}


我们有一个股票代码的列表,我们必须调用一个很慢的网络操作去获取那些股票的明细。这不是一个CPU密集型操作,但是我们仍然可以利用并行。我们可以并行地去执行多个网络请求,这也是并行 stream 一个很好的应用,不是吗?

如果你在一个项目里面把上面两件事情都做了,那将会有一个大问题,你发现了吗?这个问题是因为所有的并行 stream 会使用一个共同的 fork-join 线程池,如果你提交了一个长时间运行的任务,那么就会阻塞线程池中所有的线程。结果就是你阻塞了其它所有使用并行 stream 的任务。想象一下,在一个 servlet 环境中,当一个请求调用 getStockInfo 方法,另一个调用 countPrimes 方法。那么其中一个请求将会阻塞另一个请求,尽管这两个请求要求的是不同的系统资源。更糟的是,你还不能给并行 stream 指定线程池,在一个 class loader 里面只能共用同一个线程池。

让我们用下面的例子来说明:
private void run() throws InterruptedException {
  ExecutorService es = Executors.newCachedThreadPool();

  // Simulating multiple threads in the system
  // if one of them is executing a long-running task.
  // Some of the other threads/tasks are waiting
  // for it to finish

  es.execute(() -> countPrimes(MAX, 1000)); //incorrect task
  es.execute(() -> countPrimes(MAX, 0));
  es.execute(() -> countPrimes(MAX, 0));
  es.execute(() -> countPrimes(MAX, 0));
  es.execute(() -> countPrimes(MAX, 0));
  es.execute(() -> countPrimes(MAX, 0));


  es.shutdown();
  es.awaitTermination(60, TimeUnit.SECONDS);
}

private void countPrimes(int max, int delay) {
  System.out.println(
     range(1, max).parallel()
        .filter(this::isPrime).peek(i -> sleep(delay)).count()
  );
}

上面,我们模拟了6个线程,全都执行CPU密集型任务。第一个任务是“有问题”的,它每找到一个素数都会休眠一秒钟。当然,这只是我们假想出来的例子,你可以把它想像成一个被卡住或者是一个执行阻塞操作的线程。
问题是当这段代码执行时,会发生什么呢?我们有6个任务,其中一个要花一天才能执行完,而其它的很快就能执行完。毫无意外,每次执行的结果都不一样,有时候所有正常的任务都完成了,有时候则会有几个被卡在“有问题”的那个任务后面。你想你生成环境中的系统有这样的行为吗?一个有问题的任务撂倒整个应用的其它部分?我可不想要。

要确保不会发生这种事,有两个选择。一个是确保所有进入共同 fork-join 线程池的任务都不会卡住,能够在一个合理的时间内完成 。这个说起来容易做起来难,尤其是在一个复杂的应用系统中。另一个选择就是,在 Oracle 允许我们给一个并行 stream 指定线程池之前,不要使用并行 stream。

Resources:
Interview with Brian Goetz
A Java Parallel Calamity

Published at DZone with permission of its author, Lukas Krecan.
上一篇: asp.net读取文件 下一篇: 没有下一篇了!
发表评论
用户名: 匿名