在任何并发性应用程序中,异步事件处理都至关重要。事件来源可能是不同的计算任务、I/O 操作或与外部系统的交互。无论来源是什么,应用程序代码都必须跟踪事件,协调为响应事件而采取的操作。Java 应用程序可采用两种基本的异步事件处理方法:该应用程序有一个协调线程等待事件,然后采取操作,或者事件可在完成时直接执行某项操作(通常采取执行应用程序所提供的代码的方式)。让线程等待事件的方法被称为阻塞?方法。让事件执行操作、线程无需显式等待事件的方法被称为非阻塞?方法。
旧的?monospace; line-height: 1.5em; color: #000000 !important;">java.util.concurrent.Future
?类提供了一种简单方式来处理预期的事件完成,但仅通过轮询或等待完成的方法。Java 8 中添加的?java.util.concurrent.CompletableFuture
?类扩展了此功能,添加了大量合成或处理事件的方法。(请查阅本系列的前一篇文章 “Java 8 并发性基础”,了解?CompletableFuture
?的介绍。)具体地讲,CompletableFuture
?提供了在事件完成时执行应用程序代码的标准技术,包括各种组合任务(由 future 表示)的方式。这种组合技术使得编写非阻塞代码来处理事件变得很容易(或者至少比过去容易)。本文将介绍如何使用?CompletableFuture
?执行阻塞和非阻塞事件处理。您将获得一些线索,通过这些线索您可以了解为什么非阻塞方法值得花费更多的精力来实现。(可以从作者的 GitHub 存储库获取?完整的示例代码。)
在计算中,根据具体上下文,阻塞?和非阻塞?这两个词的使用通常会有所不同。举例而言,共享数据结构的非阻塞算法不需要线程等待访问数据结构。在非阻塞 I/O 中,应用程序线程可以启动一个 I/O 操作,然后离开执行其他事情,同时该操作会异步地执行。在本文中,非阻塞指的是在无需等待线程的情况下完成某个执行操作的事件。这些用法中的一个共同概念是,阻塞操作需要一个线程来等待某个结果,而非阻塞操作不需要。
等待事件的完成很简单:您有一个线程等待该事件,线程恢复运行时,您就可以知道该事件已经完成。如果您的线程在此期间有其他事要做,它会做完这些事再等待。该线程甚至可以使用轮询方法,通过该方法中断它的其他活动,从而检查事件是否已完成。但基本原理是相同的:需要事件的结果时,您会让线程停靠 (park),以便等待事件完成。
阻塞很容易完成且相对简单,只要您有一个等待事件完成的单一主线程。使用多个因为彼此等待而阻塞的线程时,可能遇到一些问题,比如:
非阻塞方法为创造力留出的空间要多得多。回调是非阻塞事件处理的一种常见技术。回调是灵活性的象征,因为您可以在发生事件时执行任何想要的代码。回调的缺点是,在使用回调处理许多事件时,您的代码会变得凌乱。而且回调特别难调试,因为控制流与应用程序中的代码顺序不匹配。
Java 8?CompletableFuture
?同时支持阻塞和非阻塞的事件处理方法,包括常规回调。CompletableFuture
?也提供了多种合成和组合事件的方式,实现了回调的灵活性以及干净、简单、可读的代码。在本节中,您将看到处理由?CompletableFuture
?表示的事件的阻塞和非阻塞方法的示例。
应用程序在一个特定操作中通常必须执行多个处理步骤。例如,在向用户返回结果之前,Web 应用程序可能需要:
图 1 演示了这种结构类型。
DBA3DA93A.png">
图 1 将处理过程分解为 4 个不同的任务,它们通过表示顺序依赖关系的箭头相连接。任务 1 可直接执行,任务 2 和任务 3 都在任务 1 完成后执行,任务 4 在任务 2 和任务 3 都完成后执行。这是我在本文中用于演示异步事件处理的任务结构。真实应用程序(尤其是具有多个移动部分的服务器应用程序)可能要复杂得多,但这个简单的示例仅用于演示所涉及的原理。
在真实系统中,异步事件的来源一般是并行计算或某种形式的 I/O 操作。但是,使用简单的时间延迟来建模这种系统会更容易,这也是本文所采用的方法。清单 1 显示了我用于生成事件的基本的赋时事件 (timed-event) 代码,这些事件采用了?CompletableFuture
?格式。
import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; public class TimedEventSupport { private static final Timer timer = new Timer(); /** * Build a future to return the value after a delay. * * @param delay * @param value * @return future */ public static <T> CompletableFuture<T> delayedSuccess(int delay, T value) { CompletableFuture<T> future = new CompletableFuture<T>(); TimerTask task = new TimerTask() { public void run() { future.complete(value); } }; timer.schedule(task, delay * 1000); return future; } /** * Build a future to return a throwable after a delay. * * @param delay * @param t * @return future */ public static <T> CompletableFuture<T> delayedFailure(int delay, Throwable t) { CompletableFuture<T> future = new CompletableFuture<T>(); TimerTask task = new TimerTask() { public void run() { future.completeExceptionally(t); } }; timer.schedule(task, delay * 1000); return future; } }
清单 1?中的?TimerTask
?被实现为一个匿名内部类,仅包含一个?run()
?方法。您可能认为这里可以使用lambda?代替内部类。但是,lambda 仅能用作接口的实例,而?TimerTask
?被定义为一种抽象类。除非 lambda 特性的 future 扩展添加了对抽象类的支持(有可能,但由于设计问题,未必行得通),或者为?TimerTask
?等情形定义了并行接口,否则您必须继续使用 Java 内部类创建单一方法实现。
清单 1?的代码使用一个?java.util.Timer
?来计划?java.util.TimerTask
?在一定的延迟后执行。每个?TimerTask
?在运行时完成一个有关联的 future。delayedSuccess()
?计划一个任务来成功完成一个?CompletableFuture<T>
?并将 future 返回调用方。delayedFailure()
?计划了一个任务来完成一个?CompletableFuture<T>
?并抛出异常,然后将 future 返回给调用方。
清单 2 展示了如何使用?清单 1?中的代码创建?CompletableFuture<Integer>
?形式的事件,这些事件与?图 1?中的 4 个任务相匹配。(此代码来自示例代码中的EventComposition
?类。)
// task definitions private static CompletableFuture<Integer> task1(int input) { return TimedEventSupport.delayedSuccess(1, input + 1); } private static CompletableFuture<Integer> task2(int input) { return TimedEventSupport.delayedSuccess(2, input + 2); } private static CompletableFuture<Integer> task3(int input) { return TimedEventSupport.delayedSuccess(3, input + 3); } private static CompletableFuture<Integer> task4(int input) { return TimedEventSupport.delayedSuccess(1, input + 4); }
清单 2?中 4 个任务方法中的每一个都为该任务的完成时刻使用了特定的延迟值:task1
?为 1 秒,task2
?为 2 秒,task3
?为 3 秒,task4
重新变为 1 秒。每个任务还接受一个输入值,是该输入加上任务编号作为 future 的(最终)结果值。这些方法都使用了 future 的成功形式;稍后我们将会查看一些使用失败形式的例子。
这些任务要求您按?图 1?中所示的顺序运行它们,向每个任务传递上一个任务返回的结果值(或者对于?task4
,传递前两个任务结果的和)。如果中间两个任务是同时执行的,那么总执行时间大约为 5 秒(1 秒 + (2 秒、3 秒中的最大值)+ 1 秒。如果?task1
?的输入为 1,那么结果为 2。如果该结果传递给?task2
?和?task3
,结果将为 4 和 5。如果这两个结果的和 (9) 作为输入传递给?task4
,最终结果将为 13。
在设置了执行环境后,是时候设置一些操作了。协调 4 个任务的执行的最简单方式是使用阻塞等待:主要线程等待每个任务完成。清单 3(同样来自示例代码中的?EventComposition
?类)给出了此方法。
private static CompletableFuture<Integer> runBlocking() { Integer i1 = task1(1).join(); CompletableFuture<Integer> future2 = task2(i1); CompletableFuture<Integer> future3 = task3(i1); Integer result = task4(future2.join() + future3.join()).join(); return CompletableFuture.completedFuture(result); }
清单 3?使用?CompletableFuture
?的?join()
?方法来完成阻塞等待。join()
?等待任务完成,然后,如果成功完成任务,则返回结果值,或者如果失败或被取消,则抛出一个未经检查的异常。该代码首先等待?task1
?的结果,然后同时启动?task2
?和?task3
,并等待两个任务依次返回 future,最后等待?task4
?的结果。runBlocking()
?返回一个?CompletableFuture
,以便与我接下来将展示的非阻塞形式保持一致,但在本例中,future 实际上将在该方法返回之前完成。
清单 4(同样来自示例代码中的?EventComposition
?类)展示了如何将 future 连接在一起,以便按正确顺序并使用正确的依赖关系执行任务,而不使用阻塞。
private static CompletableFuture<Integer> runNonblocking() { return task1(1).thenCompose(i1 -> ((CompletableFuture<Integer>)task2(i1) .thenCombine(task3(i1), (i2,i3) -> i2+i3))) .thenCompose(i4 -> task4(i4)); }
清单 4?中的代码基本上构造了一个执行计划,指定不同的任务如何执行和它们彼此有何关联。此代码精美而简洁,但是,如果您不熟悉CompletableFuture
?方法,或许难以理解该代码。清单 5 通过将?task2
?和?task3
?部分分离到一个新方法?runTask2and3
?中,将同样的代码重构为更容易理解的形式。
private static CompletableFuture<Integer> runTask2and3(Integer i1) { CompletableFuture<Integer> task2 = task2(i1); CompletableFuture<Integer> task3 = task3(i1); BiFunction<Integer, Integer, Integer> sum = (a, b) -> a + b; return task2.thenCombine(task3, sum); } private static CompletableFuture<Integer> runNonblockingAlt() { CompletableFuture<Integer> task1 = task1(1); CompletableFuture<Integer> comp123 = task1.thenCompose(EventComposition::runTask2and3); return comp123.thenCompose(EventComposition::task4); }
在?清单 5?中,runTask2and3()
?方法表示任务流的中间部分,其中?task2
?和?task3
?同时执行,然后将它们的结果值组合在一起。此顺序是使用一个 future 上的?thenCombine()
?方法来编码的,该方法接受另一个 future 作为它的第一个参数,接受一个二进制函数实例(其输入类型与 future 的结果类型匹配)作为其第二个参数。thenCombine()
?返回了第三个 future,表示应用到最初的两个 future 的结果上的函数的值。在本例中,两个 future 是?task2
?和?task3
,该函数将结果值求和。
runNonblockingAlt()
?方法使用在一个 future 上调用了?thenCompose()
?方法两次。thenCompose()
?的参数是一个函数实例,它接收原始 future 的值类型作为输入,返回另一个 future 作为输出。thenCompose()
?的结果是第三个 future,具有与该函数相同的结果类型。这个 future 用作在原始 future 完成后,该函数最终将返回的 future 的占位符。
对?task1.thenCompose()
?的调用将会返回一个 future,表示对?task1
?的结果应用?runTask2and3()
?函数的结果,该结果被保存为comp123
。对?comp123.thenCompose()
?的调用返回一个 future,表示对第一个?henCompose()
?的结果应用?task4()
?函数的结果,这是执行所有任务的总体结果。
示例代码包含一个?main()
?方法,以便依次运行事件代码的每个版本,并显示完成事件(约 5 秒)和结果 (13) 是正确的。清单 6 显示了从一个控制台运行这个?main()
?方法的结果。
main()
?方法dennis@linux-guk3:~/devworks/scala3/code/bin> java com.sosnoski.concur.article3.EventComposition Starting runBlocking runBlocking returned 13 in 5008 ms. Starting runNonblocking runNonblocking returned 13 in 5002 ms. Starting runNonblockingAlt runNonblockingAlt returned 13 in 5001 ms.?
回页首
目前为止,您看到了以 future 形式协调事件的代码,这些代码总是能够成功完成。在真实应用程序中,不能寄希望于事情总是这么顺利。处理任务过程中将发生问题,而且在 Java 术语中,这些问题通常表示为?Throwable
。
更改?清单 2?中的任务定义很容易,只需使用?delayedFailure()
?代替?delayedSuccess()
?方法即可,如这里的?task4
?所示:
private static CompletableFuture<Integer> task4(int input) { return TimedEventSupport.delayedFailure(1, new IllegalArgumentException("This won't work!")); }
如果运行?清单 3?并且仅将?task4
?修改为完成时抛出异常,那么您会得到?task4
?上的?join()
?调用所抛出的预期的IllegalArgumentException
。如果在?runBlocking()
?方法中没有捕获该问题,该异常会在调用链中一直传递,最终如果仍未捕获问题,则会终止执行线程。幸运的是,修改该代码很容易,因此,如果在任何任务完成时抛出异常,该异常会通过返回的 future 传递给调用方来处理。清单 7 展示了这一更改。
private static CompletableFuture<Integer> runBlocking() { try { Integer i1 = task1(1).join(); CompletableFuture<Integer> future2 = task2(i1); CompletableFuture<Integer> future3 = task3(i1); Integer result = task4(future2.join() + future3.join()).join(); return CompletableFuture.completedFuture(result); } catch (CompletionException e) { CompletableFuture<Integer> result = new CompletableFuture<Integer>(); result.completeExceptionally(e.getCause()); return result; }}
清单 7?非常浅显易懂。最初的代码包装在一个?try
/catch
?中,catch
?在返回的 future 完成时传回异常。此方法稍微增加了一点复杂性,但任何 Java 开发人员应该仍然很容易理解它。
清单 4?中的非阻塞代码甚至不需要添加?try
/catch
。CompletableFuture
?合成和组合操作负责自动为您传递异常,以便依赖的 future 也会在完成时抛出异常。
回页首
您已经查看了由?CompletableFuture
?表示的事件的阻塞和非阻塞处理方法。至少对于本文中建模的基本的任务流,两种方法都非常简单。对于更复杂的任务流,该代码也会更加复杂。
在阻塞情况下,增加的复杂性不是大问题,您仍然只需要等待事件完成。如果要在线程之间执行其他类型的同步,则会遇到线程饥饿问题,甚至是死锁问题。
在非阻塞情况下,事件的完成所触发的代码执行很难调试。在执行许多类型的事件且事件之间存在许多交互时,跟踪哪个事件触发了哪次执行就会变得很难。这种情形基本上就是回调的噩梦,无论是使用传统的回调还是?CompletableFuture
?组合和合成操作。
总而言之,阻塞代码通常具有简单性优势。那么为什么有人希望使用非阻塞方法?本节将给出一些重要的理由。
一个线程阻塞时,以前执行该线程的处理器核心会转而执行另一个线程。以前执行的线程的执行状态必须保存到内存中,并加载新线程的状态。这种将核心从运行一个线程切换到运行另一个线程的操作称为上下文切换。
除了直接的上下文切换性能成本,新线程一般会使用来自前一个线程的不同数据。内存访问比处理器时钟慢得多,所以现代系统会在处理器核心与主要内存之间使用多层缓存。尽管比主要内存快得多,但缓存的容量也小得多(一般而言,缓存越快,容量越小),所以任何时刻只能在缓存中保存总内存的小部分。发生线程切换且一个核心开始执行一个新线程时,新线程需要的内存数据可能不在缓存中,所以该核心必须等待该数据从主要内存加载。
组合的上下文切换和内存访问延迟,会导致直接的显著性能成本。图 2 显示了我使用 Oracle 的 Java 8 for 64-bit Linux 的四核 AMD 系统上的线程切换开销。此测试使用了可变数量的线程,数量从 1 到 4,096 按 2 的幂次变化,每个线程的内存块大小也是可变的,介于 0 到 64KB 之间。线程依次执行,使用?CompletableFuture
?来触发线程执行。每次一个线程执行时,它首先使用针对线程的数据返回一个简单计算结果,以显示将此数据加载到缓存中的开销,然后增加一个共享的静态变量。最后创建一个新?CompletableFuture
?实例来触发它的下一次执行,然后通过完成该线程等待的?CompletableFuture
?来启动序列中的下一个线程。最后,如果需要再次执行它,那么该线程会等待新创建的?CompletableFuture
?完成。
可以在图 2 的图表中看到线程数量和每个线程的内存的影响。线程数量为 4 个时影响最大,只要特定于线程的数据足够小,两个线程的运行速度几乎与单个线程一样快。线程数量超过 4 个后,对性能的影响相对较小。每个线程的内存量越大,两层缓存就会越快溢出,导致切换成本增高。
图 2 中所示的时间值来自我的有点过时的主要系统。您系统上相应的时间将不同,可能会小得多。但是,曲线的形状应大体相同。
图 2 显示了一次线程切换的微秒级开销,所以即使线程切换的成本达到数万个处理器时钟,但绝对数字并不大。对于中等数量的线程,16KB 数据具有 12.5 微秒的切换时间(图表中的黄线),系统每秒可执行 80,000 次线程切换。与您在任何精心编写的单用户应用程序以及甚至许多服务器应用程序中看到的结果相比,这一线程切换次数可能多得多。但对于每秒处理数千个事件的高性能服务器应用程序,阻塞的开销可能成为影响性能的主要因素。对于这种应用程序,尽可能使用非阻塞代码来最大限度减少线程切换非常重要。
认识到这些时间数据来自最理想的场景也很重要。运行线程切换程序时,会运行足够的 CPU 活动来让所有核心全速运行(至少在我的系统上是这样)。在真实应用程序中,处理负载可能具有更大的突发性。在活动量低的时间,现代处理器将一些核心过渡到休眠状态,以减少总功耗和产生的热量。这个降低功耗的过程的惟一问题是,在需求增多时,它需要时间来将核心从休眠状态唤醒。从深度休眠状态过渡到全速运行所需的时间可能达到微秒级别,而不是在这个线程切换时间示例中看到的毫秒级。
对于许多应用程序,不在特定线程上阻塞的另一个原因是,这些线程用于处理需要及时响应的事件。经典的例子就是 UI 线程。如果在 UI 线程中执行会阻塞来等待异步事件完成的代码,那么您会延迟用户输入事件的处理。没有人喜欢等待应用程序响应他们的键入、单击或触控操作,所以 UI 线程中的阻塞可能很快在来自用户的错误报告中反映出来。
UI 线程概念以一种更一般性的原则作为支撑。许多类型的应用程序,甚至非 GUI 应用程序,也必须迅速响应事件,而且在许多情况下,保持较短的响应事件至关重要。对于这些类型的应用程序,阻塞等待不是可接受的选择。
反应式编程?这个名称表示为响应灵敏且可扩展的应用程序采用的编程风格。反应式编程的核心原则是应用程序应能够:
使用阻塞式事件处理方法的应用程序无法满足这些原则。线程是有限的资源,所以在阻塞等待中占用它们会限制可伸缩性,还会增加延迟(应用程序响应时间),因为阻塞的线程无法立即响应事件。非阻塞应用程序可更快地响应事件,降低成本,同时减少线程切换开销并改善吞吐量。
反应式编程比非阻塞代码的复杂得多。反应式编程涉及到关注您应用程序中的数据流并将这些数据流实现为异步交互,而不会让接收方负担过重或让发送方积滞。这种对数据流的关注,有助于避免传统的并发编程的许多复杂性。
?回页首
本文介绍了如何使用 Java 8?CompletableFuture
?类将事件合成和组合到一种执行计划中,这个计划可以使用代码来轻松而又简洁地表示。这种非阻塞可合成性对编写能快速响应工作负载并适当处理故障的反应式应用程序不可或缺。
下一篇文章将转而介绍 Scala 方面,探讨一种完全不同但同样有趣的异步计算处理方式。async
?宏使您能够编写这样的代码,它看起来在执行顺序阻塞操作,但在幕后会将该代码转换为完全非阻塞的结构。我将通过一些示例展示此方法的用途,以及分析?async
?如何实现。
http://www.ibm.com/developerworks/cn/java/j-jvmc3/index.html
?