我们通常都是开启一个新的子线程去执行比较耗时的代码,这使用起来非常简单,只需要将耗时的代码封装在Runnable中的run()方法里面,然后调用thread.start()就行。但是我相信很多人有时候都有这样的需求,就是获取子线程运行的结果,比如客户端远程调用服务(耗时服务),我们有需要得到该调用服务返回的结果,这该怎么办呢?很显然子线程运行的run()方法是没有返回值。这个时候Future的作用就发挥出来了。
Future如何使用能够获取子线程运行的结果呢?在这里顺便提一下Callable接口,Callable产生结果,Future获取结果。如何使用他们两个来获取子线程的运行结果呢?我们先来看个简单的例子。
package test;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableFutureTest {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
Callable<Integer> calculateCallable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub
Thread.sleep(2000);//模拟耗时时间
int result = 1+2;
return result;
}
};
FutureTask<Integer> calculateFutureTask = new FutureTask<>(calculateCallable);
Thread t1 = new Thread(calculateFutureTask);
t1.start();
//现在加入Thread运行的是一个模拟远程调用耗时的服务,并且依赖他的计算结果(比如网络计算器)
try {
//模拟耗时任务,主线程做自己的事情,体现多线程的优势
Thread.sleep(3000);
int a = 3+5;
Integer result = calculateFutureTask.get();
System.out.println("result = "+(a+result));//模拟主线程依赖子线程的运行结果
long endTime = System.currentTimeMillis();
System.out.println("time = "+(endTime-startTime)+"ms");
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
运行结果如下:
从上面可以看到上面耗时大概是3s,其实主要就是主线程sleep(3000)所耗费的时间,如果不使用Future,并且依赖线程的结果,我们可能需要的时间可能是需要5s(子线程2s+主线程3s)。接下来我们从源码的角度理解上述代码工作的原理。
1,先看看类的FureTask类的关系图
FutureTask实现了RunnableTask接口,RunnableTask继承了Runnable和Future接口(接口是支持多继承的)
2,t1.start()执行的是FutureTask类的run()方法,看下FutureTask.run()方法源码
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;//构造方法传入的calculateCallable
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//最终执行的是callablede.call()方法,有返回值的
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);//保存抛出的异常
}
if (ran)
set(result);//保存执行的结果
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
3,执行的Callable.call(),即执行了calculateCallable .call(),得到call()返回的结果
接下来看看,如何将执行的结果保存起来,然后方便Future获取到,那就是调用set(result)方法
4,看看set(result)方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;//将result赋值给outcome
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
//这个方法,就是当执行完成完成的时候,唤醒get()方法挂起的线程,从而使得get()方法在阻塞
//的for循环中能够正确的获取执行完的结果
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);//唤醒线程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
最终FutureTask中的outcome变量为执行的结果
5,接下来看FutureTask.get()方法如何获取执行完的结果
//get方法表示如果执行过程完成,就获取执行的结果,否则就将当前线程挂起
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);//如果状态没完成
return report(s);//返回执行完的结果
}
//这里是执行的状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {//堵塞的方法
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();//将线程挂起,让出cpu资源
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;//返回结果
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
通过上面的几个方法看出,FutureTask.get()是一个阻塞的方法,直到运行完成之后,返回线程执行的结果。
上述中出现很多状态的常量(NEW、COMPLETING、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING 、INTERRUPTED)这些状态表示执行的过程,一般执行的过程转变如下:
1)一个FutureTask新建出来,state就是NEW状态;COMPETING和INTERRUPTING用的进行时,表示瞬时状态,存在时间极短;NORMAL代表顺利完成;EXCEPTIONAL代表执行过程出现异常;CANCELED代表执行过程被取消;INTERRUPTED被中断
2)执行过程顺利完成:NEW -> COMPLETING -> NORMAL
3)执行过程出现异常:NEW -> COMPLETING -> EXCEPTIONAL
4)执行过程被取消:NEW -> CANCELLED
5)执行过程中,线程中断:NEW -> INTERRUPTING -> INTERRUPTED
另外FutureTask中还有其他的方法,如cancel()-取消,isDone()-判断是否执行完,isCancelled()-判断执行是否取消等,感兴趣的可以自己去看相应的源码
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/111247.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...