并行执行任务的等待超时方法_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 并行执行任务的等待超时方法

并行执行任务的等待超时方法

 2013/11/23 21:31:59  blueswind8306  程序员俱乐部  我要评论(0)
  • 摘要:在并发编程中,经常会遇到提交多个子任务并行执行的场景。比如一个中心节点同时派发任务给多个子节点,然后中心节点等待所有子节点完成任务后继续主流程。在这个过程中,主节点需要设置一个最大等待超时,当达到超时时间后不再等待未返回的节点结果,做功能降级处理。对于这种需求,如果子任务是阻塞执行的,则一般会使用一个线程池来执行子任务,但主任务如何唤醒超时呢?直接想到的方式是主任务在提交完所有子任务后进入一个循环,不断判断所有子任务是否已经完成或者到达超时了,但这种方式会导致主任务线程需要频繁唤醒
  • 标签:方法 执行 超时
在并发编程中,经常会遇到提交多个子任务并行执行的场景。比如一个中心节点同时派发任务给多个子节点,然后中心节点等待所有子节点完成任务后继续主流程。在这个过程中,主节点需要设置一个最大等待超时,当达到超时时间后不再等待未返回的节点结果,做功能降级处理。

对于这种需求,如果子任务是阻塞执行的,则一般会使用一个线程池来执行子任务,但主任务如何唤醒超时呢?直接想到的方式是主任务在提交完所有子任务后进入一个循环,不断判断所有子任务是否已经完成或者到达超时了,但这种方式会导致主任务线程需要频繁唤醒,加大了上下文切换的开销。并且由于子任务是异步执行的,还需要考虑结果对象的安全发布问题,加大了编码的复杂性。

在j.u.c中有一个CompletionService接口恰好可以实现上述需求,并且避免了上下文切换的开销。其基本思路是用ExecutorCompletionService包装Executor,并在内部使用一个BlockingQueue保存所有已经完成的任务。当主任务调用ExecutorCompletionService.submit方法时包装一个FutureTask的子类对象QueueingFuture并传递给内部Executor,此对象覆盖了FutureTask的done方法。当线程池的Worker线程在任务完成后会回调这个done方法,然后这个方法将已经完成的任务注入到BlockingQueue中去。这样外部只需要调用BlockingQueue的take或poll方法就可以取到完成的任务了。

注意,由于最先完成的任务会先注入BlockingQueue中,所以主线程中取得的任务集合是按照完成的先后顺序排序的。

以下是我写的一个Demo,参考了《Java并发编程实践》第6章的6.3.6节的程序,并加入了超时等待机制:
class="java">
public class TestCompletionService {

	private static final ExecutorService executor = Executors.newFixedThreadPool(10);
	
	private static final Random r = new Random();	// 每个子任务随机等待一个时间,以模拟子任务的执行时间
	
	private static final int TASK_TIMEOUT = 5;	// 设定最长超时时间为5s
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		
		final long startTime = System.currentTimeMillis();
		final long endTime = startTime + (TASK_TIMEOUT * 1000L);
		
		CompletionService<SubTaskResult> cs = new ExecutorCompletionService<SubTaskResult>(executor);
		List<Future<SubTaskResult>> futureList = new ArrayList<Future<SubTaskResult>>();
		for (int i = 0; i < 10; i++) {
			Future<SubTaskResult> f = cs.submit(new Callable<SubTaskResult>() {
				@Override
				public SubTaskResult call() throws Exception {
					try {
						// 子任务等待一个随机时间。如果这里不是+1而是+2,就可以模拟出现超时的情况
						long waitTime = (r.nextInt(TASK_TIMEOUT) + 1) * 1000L;
						Thread.sleep(waitTime);
						return new SubTaskResult(Thread.currentThread().getName() + 
								", sub thread sleepTime=" + waitTime + "ms");	
					} catch (InterruptedException e) {
						System.out.println(Thread.currentThread().getName() + 
								", catch an interrupted exception, interrupted status=" + Thread.interrupted());
						throw e;
					}
				}
			});
			futureList.add(f);
		}
		
		try {
			for (int i = 0; i < 10; i++) {
				long timeLeft = endTime - System.currentTimeMillis();
				try {
					// timeLeft可能为负数,由于j.u.c中所有负数与0等同,所以不用单独对负数做判断
					Future<SubTaskResult> responseFuture = cs.poll(timeLeft, TimeUnit.MILLISECONDS);
					if (responseFuture == null) {
						throw new TimeoutException("waiting timeout");
					}
					SubTaskResult response = responseFuture.get();
					System.out.println(response.getResult() + ", main thread waitFor: " + timeLeft);
				} catch (InterruptedException e) {
					e.printStackTrace();
					Thread.currentThread().interrupt();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}	
			}			
		} catch (TimeoutException e) {
			// 如果超时则终止所有任务(注意cancel只是调用子线程的interrupt方法,至于能不能中断得看子线程是否支持)
			// 因为对于已经完成的任务调用Future.cancel不起效,所以不需要排除那些已经完成的任务
			for (Future<SubTaskResult> future : futureList) {
				future.cancel(true);
			}
			e.printStackTrace();
		} finally {
			executor.shutdown();
			System.out.println("used: " + (System.currentTimeMillis() - startTime));	
		}
	}

}

class SubTaskResult {
	private final String result;
	public SubTaskResult(String result) {
		this.result = result;
	}
	
	public String getResult() {
		return result;
	}
}
上一篇: iOS: 使用匹配字符串来搜索文件夹内的路径 下一篇: 没有下一篇了!
发表评论
用户名: 匿名