java并发:获取线程执行结果(Callable、Future、FutureTask)


初识Callable and Future

在编码时,我们可以通过继承Thread或是实现Runnable接口来创建线程,但是这两种方式都存在一个缺陷:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到目的。

Java5提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Callable and Future源码

(1)Callable接口

public interface Callable {
    V call() throws Exception;
}

(2)Future接口

public interface Future {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

源码解说:

(1)Callable位于java.util.concurrent包下,它是一个接口,在它里面只声明了一个call()方法。从上面的源码可以看到,Callable是一个泛型接口,call()函数返回的类型就是传递进来的泛型实参类型。

(2)Future类位于java.util.concurrent包下,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果,其cancel()方法的参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置为true,则表示可以取消正在执行过程中的任务;get()方法用来获取执行结果,该方法会阻塞直到任务返回结果。

Callable and Future示例

(1)下面的示例是一个Callable,它会采用最明显的方式查找数组的一个分段中的最大值。

import java.util.concurrent.Callable;

class FindMaxTask implements Callable {

  private int[] data;
  private int start;
  private int end;
  
  FindMaxTask(int[] data, int start, int end) {
    this.data = data;
    this.start = start;
    this.end = end;
  }

  public Integer call() {
    int max = Integer.MIN_VALUE;
    for (int i = start; i < end; i++) {
      if (data[i] > max) max = data[i];
    }
    return max;
  }
}

(2)将Callable对象提交给一个Executor,它会为每个Callable对象创建一个线程,如下代码段所示:

import java.util.concurrent.*;

public class MultithreadedMaxFinder {

  public static int max(int[] data) throws InterruptedException, ExecutionException {
    
    if (data.length == 1) {
      return data[0];
    } else if (data.length == 0) {
      throw new IllegalArgumentException();
    }
    
    // split the job into 2 pieces
    FindMaxTask task1 = new FindMaxTask(data, 0, data.length/2);
    FindMaxTask task2 = new FindMaxTask(data, data.length/2, data.length);
    
    // spawn 2 threads
    ExecutorService service = Executors.newFixedThreadPool(2);

    Future future1 = service.submit(task1);
    Future future2 = service.submit(task2);
        
    return Math.max(future1.get(), future2.get());
  }
}

补充:

ExecutorService接口中声明了若干个不同形式的submit()方法,各个方法的返回类型为Future类型,如下:

 Future submit(Callable task);
 Future submit(Runnable task, T result);
Future<?> submit(Runnable task);

初识FutureTask

因为Future只是一个接口,所以是无法直接用来创建对象来使用的,因此就有了FutureTask。

FutureTask目前是Future接口的唯一实现类,FutureTask表示一个可以取消的异步运算,它有启动和取消运算、查询运算是否完成和取回运算结果等方法。

只有当运算完成的时候才能取回结果,如果运算尚未完成,则get方法将会阻塞。

FutureTask实现了RunnableFuture接口,其声明如下:

public class FutureTask implements RunnableFuture

RunnableFuture接口定义如下:

public interface RunnableFuture extends Runnable, Future {
    void run();
}

解说:

因RunnableFuture接口继承Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口,所以FutureTask可以通过如下两种方式运行以获取结果:

  • 作为Runnable被线程执行(Thread接收Runnable类型的参数)
  • 提交给Executor执行(ExecutorService.submit(Runnable task))

FutureTask构造函数

FutureTask的构造函数接收不同形式的参数,如下:

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask(runnable, null)}
     * @throws NullPointerException if the runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

任务状态

FutureTask 的内部有一个变量 state 用来表示任务的状态,相关定义如下:

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

FutureTask示例

观察下述两个示例代码中FutureTask的使用方式

示例一

FutureTask将被作为Runnable被线程执行

(1)任务线程ThreadC:

package demo.thread;
import java.util.concurrent.Callable;
//实现Callable接口,call()方法可以有返回结果
public class ThreadC implements Callable {
      @Override
      public String call() throws Exception {
            try {//模拟任务,执行了500毫秒;
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "thread B";
      }
}

(2)主线程ThreadMain:

package demo.thread;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class ThreadMain {
      public static void main(String[] args) {
            FutureTask feature = new FutureTask(new ThreadC());
            new Thread(feature).start();//注意启动方式,FutureTask将被作为Runnable被线程执行
            
       System.out.println("这是主线程;begin!");
            //注意细细体会这个,只有主线程get了,主线程才会继续往下执行
            try {
                System.out.println("得到的返回结果是:"+feature.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("这是主线程;end!");
      }
}

示例二

FutureTask被提交给Executor执行以得到返回值

public class Task implements Callable{
    @Override
    public Integer call() throws Exception {
        Thread.sleep(3*1000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask futureTask = new FutureTask(new Task());
        executor.submit(futureTask);//FutureTask被提交给Executor执行以得到返回值
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}

FutureTask原理

前面提到了一条重要信息:ExecutorService接口中的submit()方法可以接收callable、Runnable类型的参数,方法的返回类型为Future类型。

ExecutorService的submit()方法的内部实现是根据参数构建了FutureTask对象,然后将FutureTask对象转为Future类型返回。

这也对应了下面这条信息:

FutureTask间接继承了Future接口,其构造函数可以接收callable、Runnable类型的参数。

Note:

仔细想一想,其实这个内部实现使用了适配器模式,使得不同接口的实现最终对外表现为一致

相关