在并发编程中,经常会遇到提交多个子任务并行执行的场景。比如一个中心节点同时派发任务给多个子节点,然后中心节点等待所有子节点完成任务后继续主流程。在这个过程中,主节点需要设置一个最大等待
超时,当达到超时时间后不再等待未返回的节点结果,做功能降级处理。
对于这种需求,如果子任务是阻塞执行的,则一般会使用一个
线程池来执行子任务,但主任务如何唤醒超时呢?直接想到的方式是主任务在提交完所有子任务后进入一个
循环,不断判断所有子任务是否已经完成或者到达超时了,但这种方式会导致主任务线程需要频繁唤醒,加大了上下文切换的开销。并且由于子任务是
异步执行的,还需要考虑结果对象的安全发布问题,加大了
编码的复杂性。
在j.u.c中有一个Completion
Service接口恰好可以实现上述需求,并且避免了上下文切换的开销。其基本思路是用ExecutorCompletionService包装Executor,并在内部使用一个BlockingQueue保存所有已经完成的任务。当主任务调用ExecutorCompletionService.submit方法时包装一个FutureTask的子类对象QueueingFuture并传递给内部Executor,此对象覆盖了FutureTask的done方法。当线程池的Worker线程在任务完成后会回调这个done方法,然后这个方法将已经完成的任务注入到BlockingQueue中去。这样外部只需要调用BlockingQueue的take或poll方法就可以取到完成的任务了。
注意,由于最先完成的任务会先注入BlockingQueue中,所以主线程中取得的任务集合是按照完成的先后顺序排序的。
以下是我写的一个Demo,参考了《
Java并发编程实践》第6章的6.3.6节的程序,并加入了超时等待机制:
class="java">
public class TestCompletionService {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
private static final Random r = new Random(); // 每个子任务随机等待一个时间,以模拟子任务的执行时间
private static final int TASK_TIMEOUT = 5; // 设定最长超时时间为5s
/**
* @param args
*/
public static void main(String[] args) {
final long startTime = System.currentTimeMillis();
final long endTime = startTime + (TASK_TIMEOUT * 1000L);
CompletionService<SubTaskResult> cs = new ExecutorCompletionService<SubTaskResult>(executor);
List<Future<SubTaskResult>> futureList = new ArrayList<Future<SubTaskResult>>();
for (int i = 0; i < 10; i++) {
Future<SubTaskResult> f = cs.submit(new Callable<SubTaskResult>() {
@Override
public SubTaskResult call() throws Exception {
try {
// 子任务等待一个随机时间。如果这里不是+1而是+2,就可以模拟出现超时的情况
long waitTime = (r.nextInt(TASK_TIMEOUT) + 1) * 1000L;
Thread.sleep(waitTime);
return new SubTaskResult(Thread.currentThread().getName() +
", sub thread sleepTime=" + waitTime + "ms");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() +
", catch an interrupted exception, interrupted status=" + Thread.interrupted());
throw e;
}
}
});
futureList.add(f);
}
try {
for (int i = 0; i < 10; i++) {
long timeLeft = endTime - System.currentTimeMillis();
try {
// timeLeft可能为负数,由于j.u.c中所有负数与0等同,所以不用单独对负数做判断
Future<SubTaskResult> responseFuture = cs.poll(timeLeft, TimeUnit.MILLISECONDS);
if (responseFuture == null) {
throw new TimeoutException("waiting timeout");
}
SubTaskResult response = responseFuture.get();
System.out.println(response.getResult() + ", main thread waitFor: " + timeLeft);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (TimeoutException e) {
// 如果超时则终止所有任务(注意cancel只是调用子线程的interrupt方法,至于能不能中断得看子线程是否支持)
// 因为对于已经完成的任务调用Future.cancel不起效,所以不需要排除那些已经完成的任务
for (Future<SubTaskResult> future : futureList) {
future.cancel(true);
}
e.printStackTrace();
} finally {
executor.shutdown();
System.out.println("used: " + (System.currentTimeMillis() - startTime));
}
}
}
class SubTaskResult {
private final String result;
public SubTaskResult(String result) {
this.result = result;
}
public String getResult() {
return result;
}
}