获取任务执行结果之Future原理

由于在开发过程当中需要获取线程执行的结果,故使用到了Java并发工具包java.util.concurrent下的Future类,最终通过Future的get来获取到线程执行的结果。这里我们来简单解析下,Future的实现原理及使用方式。

Java提供了两个自定义线程的底层接口,一个是Runnable接口一个是Callable接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}

public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

通过比较可以看出,Runnable接口的run函数返回时为Void,而Callable的call函数返回值为V,也就是通过泛型传入的类型参数,由此也就为获取线程执行结果提供了实现的支持。线程的执行我们一般通过交由线程池ThreadPoolExecutor来完成。在该类中,我们有多种提交任务的方式,如

1
public void execute(Runnable command)(...)

在ThreadPoolExecutor的父类AbstractExecutorService中,还提供了几个提交任务的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

这三个函数中,都通过将Runable或Callable包装成RunnableFuture,并交由线程池来执行,那么来看下RunnableFuture的定义

1
2
3
4
5
6
7
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

该接口提供了两个功能,一个是继承Runnable接口提供了任务的运行功能,一个是继承Future接口,用于获取执行结果。接下来我们看下该处使用的RunnableFuture的具体实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

对应Runnable和Callable入参,创建FutureTask。其中由于Runnable是没有返回值的,由第一个函数的javadoc可知,我们可以传递一个T类型的value值,当任务执行完毕时,将会返回该value。
由此,我们大概可以推测出能获取执行结果的任务是如何执行的。首先,通过ThreadPoolExecutor的submit函数提交一个实现了Callable接口或者Runnable接口的实现类给线程池,线程池会将该任务包装成一个RunnableFuture对象(实现了Future接口及Runnable接口),并将该对象提交线程池执行,并返回该对象。在最外层,就可以通过Future的get()来获取线程的执行结果了。如下:

1
2
3
PdfReaderCallable pdfReaderCallable = new PdfReaderCallable(url);
Future<String> task = pdfReadExecutor.submit(pdfReaderCallable);
String result = task.get();

其中PdfReaderCallable实现了Callable接口,pdfReadExecutor为线程池实现类。当执行该逻辑的线程执行了到第三条语句的时候,将被阻塞,一直等到task.get()返回结果才会继续向下执行。
接下来,让我们更深入地来看一下最终提交给线程池执行的FutureTask的实现原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class FutureTask<V> implements RunnableFuture<V> {
...

private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

...

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

...

public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
...

public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
}

这里只列举了几个比较重要的成员变量及成员函数。通过构造函数可以看出,成员变量callable是实际的任务类。在FutureTask的run方法中最终调用了该实际任务类来执行任务并获取结果result,最后调用set函数将result赋值给outcome对象。而在外部,我们通过get()函数来获取结果,如果任务含没有执行完,那么将会通过awaitDone函数阻塞,一直等待任务执行完毕,才能继续执行并最终通过report函数返回执行结果。
至此,获取任务执行结果的Future实现就分析结束。

欢迎关注个人公众号:
个人公号