多线程(二十二、异步执行-Futrue模式)

2023-06-25,,

Future简介

如果一个任务需要返回执行结果,一般我们会实现一个Callable任务,并创建一个线程来执行任务。对于执行时间比较长的任务,显然我们同步的等待结果再去执行后续的业务是不现实的,那么,Future模式是怎样解决这个问题的呢?

Future模式,可以让调用方立即返回,然后它自己会在后面慢慢处理,此时调用者拿到的仅仅是一个凭证,调用者可以先去处理其它任务,在真正需要用到调用结果的场合,再使用凭证去获取调用结果。这个凭证就是这里的Future。

Future接口的定义:

public interface Future<V> {
      // 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
      // 任务是否取消
    boolean isCancelled();
      // 标记任务是否执行完成
    boolean isDone();
      // 阻塞获取任务结果
    V get() throws InterruptedException, ExecutionException;
      // 超时获取任务结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

Future模式中,最重要的就是FutureTask类

FutureTask一共给任务定义了7种状态

1、NEW:表示任务的初始化状态;
2、COMPLETING:表示任务已执行完成(正常完成或异常完成),但任务结果或异常原因还未设置完成,属于中间状态;
3、NORMAL:表示任务已经执行完成(正常完成),且任务结果已设置完成,属于最终状态;
4、EXCEPTIONAL:表示任务已经执行完成(异常完成),且任务异常已设置完成,属于最终状态;
5、CANCELLED:表示任务还没开始执行就被取消(非中断方式),属于最终状态;
6、INTERRUPTING:表示任务还没开始执行就被取消(中断方式),正式被中断前的过渡状态,属于中间状态;
7、INTERRUPTED:表示任务还没开始执行就被取消(中断方式),且已被中断,属于最终状态。

各个状态之间的流转:

FutureTask构造

FutureTask在构造时可以接受Runnable或Callable任务,如果是Runnable,则最终包装成Callable:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
                // 包装Runnable成为Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

FutureTask成员

private volatile int state;//任务状态
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

private Callable<V> callable;       // 真正的任务
private volatile Thread runner;     // 保存正在执行任务的线程

/**
 * 记录结果或异常
 */
private Object outcome;

/**
 * 无锁栈(Treiber stack)
 * 保存等待线程
 */
private volatile WaitNode waiters;

当调用FutureTask的get方法时,如果任务没有完成,则调用线程会被阻塞,其实就是将线程包装成WaitNode结点保存到waiters指向的栈中。

static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

任务执行run

public void run() {
    // 仅当任务为NEW状态时, 才能执行任务
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                            //执行任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                                //设置异常
                setException(ex);
            }
            if (ran)
                            //设置任务执行结果outcome
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

set方法:

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;//存储结果值
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

任务取消

public boolean cancel(boolean mayInterruptIfRunning) {
    // 仅NEW状态下可以取消任务
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;

    try {   
        if (mayInterruptIfRunning) {    // 中断任务
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
            //释放所有在栈上等待的线程
        finishCompletion();
    }
    return true;
}

任务取消后,最终调用finishCompletion方法,释放所有在栈上等待的线程

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) { //自旋释放所有等待线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//唤醒线程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

获取结果

FutureTask可以通过get方法获取任务结果,如果需要限时等待,可以调用get(long timeout, TimeUnit unit)

public V get() throws InterruptedException, ExecutionException {
    int s = state;
        //当前任务的状态是NEW或COMPLETING,会调用awaitDone阻塞线程
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);   // 任务执行结果
}
/**
 * 返回执行结果.
 */
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V) x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable) x);
}

ScheduledFutureTask

1、ScheduledFutureTask在普通FutureTask的基础上增加了周期执行/延迟执行的功能
2、ScheduledFutureTask是ScheduledThreadPoolExecutor这个线程池的默认调度任务类,通过继承FutureTask和Delayed接口来实现周期/延迟功能的。

ScheduledFutureTask的源码非常简单,基本都是委托FutureTask来实现的

任务运行
public void run() {
    // 是否是周期任务
    boolean periodic = isPeriodic();  
        //// 能否运行任务
    if (!canRunInCurrentRunState(periodic))      
        cancel(false);
    else if (!periodic)  // 非周期任务:调用FutureTask的run方法运行
        ScheduledFutureTask.super.run();
                // 周期任务:调用FutureTask的runAndReset方法运行
    else if (ScheduledFutureTask.super.runAndReset()) { 
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

FutureTask的runAndReset方法与run方法的区别就是当任务正常执行完成后,不会设置任务的最终状态(即保持NEW状态),以便任务重复执行:

protected boolean runAndReset() {
    // 仅NEW状态的任务可以执行
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return false;

    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); //不设置执行结果
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        runner = null;
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;//重新设置任务状态为NEW,继续重复执行
}

《多线程(二十二、异步执行-Futrue模式).doc》

下载本文的Word格式文档,以方便收藏与打印。