publicinterfaceRunnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ publicabstractvoidrun(); }
publicinterfaceCallable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call()throws Exception; }
publicinterfaceRunnableFuture<V> extendsRunnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ voidrun(); }
/** * Returns a {@code RunnableFuture} for the given callable task. * * @param callable the callable task being wrapped * @param <T> the type of the callable's result * @return a {@code RunnableFuture} which, when run, will call the * underlying callable and which, as a {@code Future}, will yield * the callable's result as its result and provide for * cancellation of the underlying task * @since 1.6 */ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { returnnewFutureTask<T>(runnable, value); }
private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes
...
publicFutureTask(Callable<V> callable) { if (callable == null) thrownewNullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
publicFutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
...
publicvoidrun() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts ints= state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protectedvoidset(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } ...
public V get()throws InterruptedException, ExecutionException { ints= state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
private V report(int s)throws ExecutionException { Objectx= outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) thrownewCancellationException(); thrownewExecutionException((Throwable)x); } }