class="p1">当数据量大的时候,采用并行的方式有时候也不是最好的选择,查看并行流的源码:
?
default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true); } default Spliterator<E> spliterator() { return Spliterators.spliterator(this, 0); }
?
可以看出parallelStream是将Collection通过Spliterators先切分成小的数据块,至于能比stream快的效果应该是,切分完成后充分利用机器的处理器核来并行执行这些小的数据块,再将结果进行汇聚。而普通的stream应该是属于在一个处理器核上进行并发的处理数据。(有不对的地方还望各位指出,学习学习)
而数据量很大,且需要远程http或其他通信获取数据时,可能就会因为网络、机器等什么其他一些问题而使数据一直处于加载中,或由于超时直接请求断线了,此时考虑异步的执行方式是最好的。
?
?
同步API和异步API的对比:
1)同步API就是指当你调用了某个方法(getName()),而调用方(User)在被调用方(getName())运行的过程中会一直等待状态,待被调用方(getName())运行结束完成后,调用方(User)才可以继续运行下一步,这就形成了一种阻塞式调用。
2)异步API就是在指在被调用方(getName())完成计算之前,可能会将它的剩余计算任务交给另外一个线程来执行,此时的线程和调用方就形成了一种异步的操作,也就是非阻塞式调用。
?
?
而对于异步执行可以使用工厂方法supplyAsync来创建CompletableFuture:?
?
CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> yourMethod()); 源码: /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the given executor with the value obtained * by calling the given Supplier. * * @param supplier a function returning the value to be used * to complete the returned CompletableFuture * @param executor the executor to use for asynchronous execution * @param <U> the function's return type * @return the new CompletableFuture */ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } 你会发现我们还可以自定义一个满足自身业务需求的执行器构造传入。
?
?
?
对于自定义执行器的应用场景举个例子:
假设:你的机器为8核,顺序执行时每1ms处理一个数据
1)当需要处理的集合中的数据量也正好为8,此时使用顺序(stream)执行时需要8ms,而使用并行(parallelStream)或异步(CompletableFuture)执行时因为充分利用处理器所以可能只需要消耗1ms。
2)当需要处理的集合中的数据量也正好为9,此时使用顺序(stream)执行时需要9ms,使用并行(parallelStream)执行时可能需要消耗1.12ms,而使用异步(CompletableFuture)执行时也和并行一样差不多的时间。
对于第二种情况:这是因为他们俩内部都使用的是同样的通用线程池,默认都使用固定数量的线程,也就是默认线程数(Runtime.getRuntime().availableProcessors()的返回值)都一样造成的所消耗的时间相差不大。此时就可以通过自定义执行器来控制线程池的大小,提高并发数,进而来更好的利用系统资源。
?
?
对于线程池的相关知识,可以阅读Brian Goetz《Java并发编程实战》这本书籍,里面有很多比较好优化方法。在执行时,要是线程池中的线程数量太多,会因为竞争稀缺的处理器和内存资源,反而浪费大量的时间在上下文切换上。而要是线程数量太少,优惠因为处理器的一些核不能被充分利用,造成资源上的浪费。所以你会看到这本书里面总结了一个公式:
?
N(threads) = N(cpu) * U(cpu) * (1 + W/C) N(threads): 适合的线程数。 N(cpu): 指处理器的核数目,Runtime.getRuntime().availableProcessors(),也是默认线程数。 U(cpu): 指你所期望的CPU利用率(介于0-1之间)。 W/C: 指等待时间与计算时间的比率(这地方还需要找资料看看?????)。
?
?
?
//自定义执行器
?
ExecutorService executor = Executors.newFixedThreadPool(Math.min(list.size(), 100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); return t; } } );
?
?
?
?
对于并行流和使用异步执行的选择,当不需要涉及到网络请求、IO处理、等待时间可能有点慢的情况下,建议考虑并行流parallelStream来处理。相反,涉及到网络、IO的时候可以考虑CompletableFuture来执行。
?
?