class="java"> //他是ExecutorService的部分实现 public abstract class AbstractExecutorService implements ExecutorService //提交一个Runnable任务给AbstractExecutorService执行返回Future public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } //提交一个Runnable任务并指定返回值给AbstractExecutorService执行返回Future public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } //提交一个Callable任务并指定返回值给AbstractExecutorService执行返回Future public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } //一旦有任务执行成功就返回 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); try { ExecutionException ee = null; long lastTime = timed ? System.nanoTime() : 0; Iterator<? extends Callable<T>> it = tasks.iterator(); //将任务加入到futures集合中 futures.add(ecs.submit(it.next())); --ntasks; int active = 1; for (;;) { //获取Future Future<T> f = ecs.poll(); //当前没有完成的任务 if (f == null) { //还有任务未提交 if (ntasks > 0) { --ntasks; //再提交一个任务 futures.add(ecs.submit(it.next())); ++active; } //没有活动的任务重新循环 else if (active == 0) break; //如果有超时 else if (timed) { f = ecs.poll(nanos, TimeUnit.NANOSECONDS); //超时抛出异常 if (f == null) throw new TimeoutException(); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } else //都没有则阻塞获取Future f = ecs.take(); } if (f != null) { --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { for (Future<T> f : futures) //尝试取消完成的任务 f.cancel(true); } } //一旦有任务执行成功就返回,超时抛出异常 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } //执行所有Futrue返回完成任务的Future列表 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (Future<T> f : futures) { //如果任务还没有完成 if (!f.isDone()) { try { //获取 f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { //如果没有任务完成,尝试取消Future if (!done) for (Future<T> f : futures) f.cancel(true); } } //执行所有任务返回所有未超时并且执行了的任务 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); long lastTime = System.nanoTime(); Iterator<Future<T>> it = futures.iterator(); while (it.hasNext()) { execute((Runnable)(it.next())); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; //超时了直接返回 if (nanos <= 0) return futures; } for (Future<T> f : futures) { //如果任务还为完成 if (!f.isDone()) { //超时了直接返回 if (nanos <= 0) return futures; try { f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { //超时异常直接返回 return futures; } long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } } done = true; return futures; } finally { //所有任务并没有全部返回(可能抛出异常等) if (!done) for (Future<T> f : futures) f.cancel(true); } }