Future 接口
Future 接口
Future 接口定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等等。
public interface Future<V> {
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future 接口是 Java 5 版本提供的一个接口,它提供了一种异步并行计算能力。例如有一个处理任务包含了很多子任务,如果使用单一线程处理那么主任务和子任务是串行处理的,这时就可以使用 Future 接口将子任务放到异步线程中执行,将主任务与子任务执行分离,从宏观上看主任务与子任务是并行处理的,主任务执行完毕之后可以选择结束或等待子任务执行并获取子任务执行结果。
方法名 | 方法描述 |
---|---|
boolean cancel(boolean mayInterruptIfRunning) | 取消当前任务的执行,参数 mayInterruptIfRunning 表示是否需要中断正在执行的任务。 |
boolean isCancelled() | 任务是否已取消。 |
boolean isDone() | 任务是否已结束。 |
V get() | 获取任务执行的结果(任务未执行完毕会阻塞)。 |
V get(long timeout, TimeUnit unit) | 获取任务执行的结果(任务未执行完毕会阻塞,直到超出 timeout 停止阻塞)。 |
RunnableFuture 接口
在实际的使用中,我们并不能直接使用 Future 接口来开发我们的异步多线程任务。从 Thread 的构造函数中不难看出,多线程任务必须要实现 Runnable 接口,所以这里又引出了另一接口 RunnableFuture。
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture 实现类 FutureTask
RunnableFuture 接口继承了 Runnable 和 Future 接口,同时拥有这两个接口的能力,接下来再看看 RunnableFuture 接口的一个常用的实现类 FutureTask。FutureTask 实现了 Future 接口定义的方法,并对 run() 方法进行了实现,它与我们普通编写的 run() 方法不同是它实现了任务执行结果的获取。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
FutureTask 构造函数
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
如上源码,FutureTask 有两个构造函数,第一个构造函数需要传入一个 Callable 的实现。
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* {@code call}.
*
* <p>The {@code Callable} interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* {@code Runnable}, however, does not return a result and cannot
* throw a checked exception.
*
* <p>The {@link Executors} class contains utility methods to
* convert from other common forms to {@code Callable} classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> the result type of method {@code call}
*/
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Callable 接口中的 call() 方法与 Runnable 接口的 run() 的作用是一致的,都是用来实现具体的线程任务,只不过 call() 方法支持返回任务执行的结果,而 run() 却不能。
演示
public class FutureTaskDemo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new Task01());
Thread thread = new Thread(task);
thread.start();
System.out.println("main over");
}
}
class Task01 implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("task running...");
Thread.sleep(3000);
System.out.println("task over");
return "ok";
}
}
运行代码,输出结果
main over
task running...
task over
上述程序中运行了两个线程,一个 main 主线程,一个 thread 自定义线程;main 主线程最先执行完毕,输出 main over
,然后输出 task running
,等待 3 秒之后 thread 线程执行完毕输出 task over
。从这里可以看出使用 FutureTask 创建的线程任务与使用 Runnable 并没有什么大的区别,下面再修改一下代码,使程序可以获取到 task 任务的返回结果。
public class FutureTaskDemo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new Task01());
Thread thread = new Thread(task);
thread.start();
String s = task.get();
System.out.println("task result: " + s);
System.out.println("main over");
}
}
运行代码,输出结果
task running...
task over
task result: ok
main over
修改代码之后,在 main 主线程输出 main over
之前使用 task 对象调用 get() 获取返回结果,此时的程序输出与上一个程序存在很大的区别;不难发现,main 主线程执行 task.get()
的时候阻塞了。
FutureTask 的 get() 方法会阻塞
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
FutureTask 有两个 get() 方法,这两个 get 方法都是用来获取异步任务执行的结果的,对于 public V get()
方法说来,调用它会一直阻塞直到获取到返回结果或者任务中断或内部抛出异常;但是 public V get(long timeout, TimeUnit unit)
方法可以指定等待结果的事件,一旦调用 get 方法阻塞超过了预设的时间,就会抛出异常。
awaitDone 方法等待执行结束
int awaitDone(boolean timed, long nanos)
方法是让 get 方法阻塞的真正原因,其实 awaitDone
内部是通过轮询的方式对当前任务的执行状态进行判断,如果任务没有发生终端、异常,那么它会一直轮询下去(前提是没有配置超时)。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
我们知道无限制的轮询是会很耗费资源的;另外在上述的演示中,直接调用 get 方法获取结果是不可取的,主要原因就是因为它会阻塞。设想一下,如果有 10 个这样的 task 那么是不是需要调用 10 次,理想情况下,每个 task 的执行时间都是一样,这么看来问题不大,但是如果调用的第一个 task 的执行时间比其它任务都要长的话,会导致其它任务一直处理一种假等待状态,并没有真正发挥多线程的优势。
isDone() 任务是否已结束
调用 isDone() 方法可以明确知道当前任务是否已结束,它内部与 awaitDone 一样也是通过判断当前任务的状态,只不过 isDone() 方法不是使用轮询,而是简单判断当前任务的状态。
public boolean isDone() {
return state != NEW;
}
有了 isDone() 方法,之前的程序就可以进行调整,不再去直接调用 get() 阻塞获取任务结果,而是自己编写轮询,调用 isDone() 方法,如果返回 true 表示任务已结束,直接调用 get();返回 false,表示任务未结束,执行其它任务。
public class FutureTaskDemo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new Task01());
Thread thread = new Thread(task);
thread.start();
System.out.println("main over");
for (; ; ) {
if (task.isDone()) {
String s = task.get();
System.out.println("task result: " + s);
break;
}
Thread.sleep(1000);
}
}
}
通过自己编写轮询的好处就是可以控制轮询的时间间隔,不像 awaitDone 一样无节制的轮询;另外就是在多个任务的情况下,不会发生一个任务结果没有获取到而导致其它任务也获取不到的现象。
FutureTask 的优缺点
优点:
- 相比 Runnable 可以获取任务执行的返回结果。
缺点:
- get() 方法获取任务结果会阻塞;同时因为阻塞,会影响到其它任务的结果得不到及时处理;
- 做不到多任务交叉串行执行,例如,C 任务执行必要条件是 A、B 任务的执行的结果。