class="java"> //一个可以异步返回计算的结果 //它同时实现了Future和Runnable //先看构造函数 public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } //运行runnable并返回给定的result public FutureTask(Runnable runnable, V result) { //适配器模式转化runnable接口 this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } //适配器模式转化runnable接口 static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } } public void run() { //如果state不等于0或者设置当前已经被其他线程占用了直接返回。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } //如果已经执行了设置值 if (ran) set(result); } } finally { runner = null; int s = state; //如果被取消了 if (s >= INTERRUPTING) //让出执行权 handlePossibleCancellationInterrupt(s); } } //设定指定值 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } private void finishCompletion() { //释放所有等待线程 for (WaitNode q; (q = waiters) != null;) { //清空当前线程成功 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //释放线程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //钩子方法 done(); callable = null; // to reduce footprint } private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt } protected void done() { } //获取结果 public V get() throws InterruptedException, ExecutionException { int s = state; //如果还为完成 if (s <= COMPLETING) //在这上面阻塞 s = awaitDone(false, 0L); return report(s); } //加入队列阻塞当前线程 private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //如果当前线程已经被中断了 if (Thread.interrupted()) { //从等待队列中清空他 removeWaiter(q); throw new InterruptedException(); } int s = state; //已经执行过了 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet //让出执行权 Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) //加入队列 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); //超时删除q if (nanos <= 0L) { removeWaiter(q); return state; } //挂起当前线程 LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; //到这里说明q.thread==null,q是需要删除的节点。 else if (pred != null) { //修改上一个节点的next pred.next = s; //上一个节点被删了,重新循环。 if (pred.thread == null) continue retry; } //走到这里说明第一个节点就是被删除的节点 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) //设置失败重新循环 continue retry; } break; } } } //获取值 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } //在一定时间内等待获取结果超时抛出异常 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } //任务是否取消 public boolean isCancelled() { return state >= CANCELLED; } //任务是否完成 public boolean isDone() { return state != NEW; } //试图取消任务的执行 public boolean cancel(boolean mayInterruptIfRunning) { //任务已经开始直接返回false if (state != NEW) return false; //试图中断 if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //尝试取消 else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; } //将结果设置为异常 protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } //执行计算但不设置结果执行完后重置 protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { runner = null; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }