Future、 CompletableFuture、ThreadPoolTaskExecutor简单实践

2023-04-25,,

Future(jdk5引入)

简介: Future接口是Java多线程Future模式的实现,可以来进行异步计算。

可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,也可以使用cancel方法停止任务的执行。

简单测试 -  主题 : Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务,期间程序可以处理其他任务。

                              一段时间之后,主线程可以从Future那儿取出结果

public class ThreadPoolTest {

    private static  Logger logger= LoggerFactory.getLogger(ThreadPoolTest.class);

    public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();//创建单一线程
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
logger.info("out2-----");
return "Hello world";
}
}); try {
try {
BigDecimal bigDecimal=new BigDecimal(0);
logger.info("out1:"+bigDecimal.toString());
logger.info(future.get(3000,TimeUnit.MILLISECONDS));//Hello world
} catch (TimeoutException e) {
logger.error("timeout-exception",e);
}
} catch (InterruptedException e) {
logger.error("interrupted-exception",e);
} catch (ExecutionException e) {
logger.error("execution-exception",e);
}finally {
threadPool.shutdown();
}
}

console输出:

11:17:17.558 [main] INFO tk.mybatis.springboot.util.thread.ThreadPoolTest - out1:0
Disconnected from the target VM, address: '127.0.0.1:54966', transport: 'socket'
11:17:19.556 [pool-1-thread-1] INFO tk.mybatis.springboot.util.thread.ThreadPoolTest - out2-----
11:17:19.557 [main] INFO tk.mybatis.springboot.util.thread.ThreadPoolTest - Hello world

超时会报错:

SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
10:44:07.413 [main] ERROR tk.mybatis.springboot.util.thread.ThreadPoolTest - timeout-exception
java.util.concurrent.TimeoutException: null
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at tk.mybatis.springboot.util.thread.ThreadPoolTest.main(ThreadPoolTest.java:30) Process finished with exit code 0

总结:1  比起future.get(),推荐使用get (long timeout, TimeUnit unit) 方法,

设置了超时时间可以防止程序无限制的等待future的结果,可以进行异常处理逻辑。

2  虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,

只能通过阻塞或者轮询的方式得到任务的结果。

阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,

为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

Future的接口很简单,只有五个方法。jdk8中源码如下:

package java.util.concurrent;

/**
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* {@code cancel} method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
*
* <p>
* <b>Sample Usage</b> (Note that the following classes are all
* made-up.)
* <pre> {@code
* interface ArchiveSearcher { String search(String target); }
* class App {
* ExecutorService executor = ...
* ArchiveSearcher searcher = ...
* void showSearch(final String target)
* throws InterruptedException {
* Future<String> future
* = executor.submit(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* displayOtherThings(); // do other things while searching
* try {
* displayText(future.get()); // use future
* } catch (ExecutionException ex) { cleanup(); return; }
* }
* }}</pre>
*
* The {@link FutureTask} class is an implementation of {@code Future} that
* implements {@code Runnable}, and so may be executed by an {@code Executor}.
* For example, the above construction with {@code submit} could be replaced by:
* <pre> {@code
* FutureTask<String> future =
* new FutureTask<String>(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* executor.execute(future);}</pre>
*
* <p>Memory consistency effects: Actions taken by the asynchronous computation
* <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>
* actions following the corresponding {@code Future.get()} in another thread.
*
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface Future<V> { /**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning); /**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled(); /**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone(); /**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException; /**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

看懂英文还是最直接的。

二. CompletableFuture介绍

2.1 Future模式的缺点

Future虽然可以实现获取异步执行结果的需求,但是它没有提供通知的机制,我们无法得知Future什么时候完成。

要么使用阻塞,在future.get()的地方等待future返回的结果,这时又变成同步操作。要么使用isDone()轮询地判断Future是否完成,这样会耗费CPU的资源。

2.2 CompletableFuture介绍

 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}

Netty、Guava分别扩展了Java 的 Future 接口,方便异步编程。

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,

可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,

并且提供了转换和组合CompletableFuture的方法。

1 )主动完成计算
CompletableFuture类实现了CompletionStage和Future接口,所以你还是可以像以前一样通过阻塞或者轮询的方式获得结果,尽管这种方式不推荐使用。
public T     get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent) //如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent
public T join()

 join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别

@Test
public void test(){
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
int i=1/0;
return 100;
});
logger.info(String.format("join=%d", future.join()));
try {
logger.info(String.format("get=%s", future.get()));
} catch (InterruptedException e) {
logger.error("error=%s",e);
} catch (ExecutionException e) {
logger.error("error=%s",e);
}
}

尽管Future可以代表在另外的线程中执行的一段异步代码,但是你还是可以在本身线程中执行:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; public class BasicMain {
private static Logger logger = LoggerFactory.getLogger(BasicMain.class);

public static CompletableFuture<Integer> compute() {
final CompletableFuture<Integer> future = new CompletableFuture<>();
return future;
} public static void main(String[] args) throws Exception {
final CompletableFuture<Integer> f = compute(); class Client extends Thread {
CompletableFuture<Integer> f;
Client(String threadName, CompletableFuture<Integer> f) {
super(threadName);
this.f = f;
} @Override
public void run() {
try {
logger.info(this.getName() + ": " + f.get());
} catch (InterruptedException e) {
logger.error(e.getMessage());
} catch (ExecutionException e) {
logger.error(e.getMessage());
}
}
}
new Client("Client1", f).start();
new Client("Client2", f).start();
logger.info("waiting....");
f.complete(100);
f.obtrudeValue(200);
//logger Client1: 200 Client2: 200 或者 Client1: 100 Client1: 100 或者 Client1: 200 Client2: 100 ,都有可能 // f.completeExceptionally(new Exception());
    }
}

说明:

可以看到我们并没有把f.complete(100) 放在另外的线程中去执行,但是在大部分情况下我们可能会用一个线程池去执行这些异步任 务。CompletableFuture.complete()CompletableFuture.completeExceptionally只能被调用一次。

但是我们有两个后门方法可以重设这个值:obtrudeValueobtrudeException,但是使用的时候要小心,

因为complete已经触发了客户端,有可能导致客户端会得到不期望的结果

2)创建CompletableFuture对象。

CompletableFuture的静态工厂方法

方法名 描述
runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。
runAsync(Runnable runnable, Executor executor) 使用指定的thread pool执行异步代码。
supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值
supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的thread pool执行异步代码,异步操作有返回值

eg: runAsync 和 supplyAsync 方法比较

区别是:

runAsync返回的CompletableFuture是没有返回值的

    原因:

      Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空

共同点:

因为方法的参数类型都是函数式接口,所以可以使用lambda表达式实现异步任务,比如:

 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
}); try {
logger.info("out = "+ future.get());//out = Hello
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
      CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Hello");
}); try {
logger.info("out = "+ future.get());//out = null
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

简单试验 - 主题:在两个线程里并行执行任务A和任务B,只要有一个任务完成了,就执行任务C

package tk.mybatis.springboot.util.thread;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.time.LocalTime;
import java.util.Random;
import java.util.concurrent.*; /**
* 在两个线程里并行执行任务A和任务B,只要有一个任务完成了,就执行任务C
*
* 两种方法useFuture和useCompletableFuture相比:
*
* 首先,useCompletableFuture 比 useFuture 的代码简单。
* 在useFuture 里,既要自己照顾线程池的创建和销毁,还要负责对任务A和任务B的监控。
* 而useCompletableFuture,只需要用CompletableFuture的runAfterEither就完成了任务A、任务B和任务C之间的依赖关系的定义。
*
*/
public class CompletableFutureTest { private static Logger logger = LoggerFactory.getLogger(CompletableFutureTest.class);
private static Random random = new Random(); /**
* useFuture test
* @throws InterruptedException
* @throws ExecutionException
*/
private static void useFuture() throws InterruptedException, ExecutionException {
logger.info("useFuture");
ExecutorService exector = Executors.newFixedThreadPool(3);
Future<Void> futureA = exector.submit(() -> work("A1"));
Future<Void> futureB = exector.submit(() -> work("B1"));
while (true) {
try {
futureA.get(1, TimeUnit.SECONDS);
break;
} catch (TimeoutException e) {
}
try {
futureB.get(1, TimeUnit.SECONDS);
break;
} catch (TimeoutException e) {
}
}
exector.submit(() -> work("C1")).get();
exector.shutdown();
} private static void useCompletableFuture() throws InterruptedException, ExecutionException {
logger.info("useCompletableFuture");
CompletableFuture<Void> futureA = CompletableFuture.runAsync(() -> work("A2"));
CompletableFuture<Void> futureB = CompletableFuture.runAsync(() -> work("B2"));
logger.info("get="+futureA.runAfterEither(futureB, () -> work("C2")).get()); // 或者
// CompletableFuture.runAsync(() -> work("A2"))
// .runAfterEither(CompletableFuture.runAsync(() -> work("B2"))
// , () -> work("C2"))
// .get();
} /**
* logger 输出
* @param name
* @return
*/
public static Void work(String name) {
logger.info(name + " starts at " + LocalTime.now());
try {
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (InterruptedException e) {
}
logger.info(name + " ends at " + LocalTime.now());
return null;
} /**
* 从useFuture的输出可以看出,
* 任务C1的开始并不是紧随着任务A1的完成,差了0.001秒,
* 原因是在方法1里,是对任务A1和任务B1都用get(1,TimeUnit.SECONDS)来询问他们的状态,
* 当其中一个任务先完成时,主线程可能正阻塞在另一个未完成任务的get上
*
* 而从useCompletableFuture完全不存在这样的问题,
* 任务C2的开始于任务A1的结束之间没有任何的时间差
*
* @param args
* @throws InterruptedException
* @throws ExecutionException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
// useFuture();
//logger 输出
// B1 starts at 10:28:09.562
// A1 starts at 10:28:09.562
// B1 ends at 10:28:17.566
// C1 starts at 10:28:17.567
// A1 ends at 10:28:18.563
// C1 ends at 10:28:20.570 // TimeUnit.SECONDS.sleep(10);
useCompletableFuture();
//logger 输出
// A2 starts at 10:43:46.867
// B2 starts at 10:43:46.867
// B2 ends at 10:43:51.871
// A2 ends at 10:43:51.871
// C2 starts at 10:43:51.871
// C2 ends at 10:44:00.874
TimeUnit.SECONDS.sleep(10);//避免打印出的start - end 不全 } }

 3)计算结果完成时的处理

 当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:

   public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
} public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
} public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}

可以看到Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
       方法不以Async结尾,意味着Action使用相同的线程执行,

Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

注意这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。

package tk.mybatis.springboot.util.thread;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future; public class MainTest {
private static Logger logger = LoggerFactory.getLogger(MainTest.class); private static Random random = new Random();
private static long t = System.currentTimeMillis(); static Map<String,String> getMoreData() {
logger.info("start");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("end ");
Map map=new HashMap();
map.put(random.nextInt(1000),random.nextInt(1000));
return map;
} public static void main(String[] args) throws Exception {
CompletableFuture<Map<String,String>> future = CompletableFuture.supplyAsync(MainTest::getMoreData);
Future<Map<String,String>> f = future.whenComplete((v, e) -> {
logger.info("v="+v);
logger.info("e="+e);
});
logger.info("get="+f.get());
}
}

     exceptionally方法返回一个新的CompletableFuture,

当原始的CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算,调用function计算值,

否则如果原始的CompletableFuture正常计算完后,这个新的CompletableFuture也计算完成,它的值和原始的CompletableFuture的计算的值相同(??)。

也就是这个exceptionally方法用来处理异常的情况。

        CompletableFuture<Map<String,String>> futureException = future.exceptionally(new Function<Throwable, Map<String, String>>() {
@Override
public Map<String, String> apply(Throwable throwable) {
logger.error("error="+throwable.getMessage());
Map map=new HashMap();
map.put(random.nextInt(1000),random.nextInt(1000));
return map;
}
}); logger.info("get.exception="+futureException.get());

下面一组方法虽然也返回CompletableFuture对象,但是对象的值和原来的CompletableFuture计算的值不同。

当原先的CompletableFuture的值计算完成或者抛出异常的时候,会触发这个CompletableFuture对象的计算,结果由BiFunction参数计算而得。

因此这组方法兼有whenComplete和转换的两个功能。

public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

同样,不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。

4)转换

    CompletableFuture可以作为monad(单子)和functor。由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,

而是告诉CompletableFuture当计算完成的时候请执行某个function。而且我们还可以将这些操作串联起来,或者将CompletableFuture组合起来。

三 ThreadPoolTaskExecutor

包路径:org.springframework.scheduling.concurrent

Future、 CompletableFuture、ThreadPoolTaskExecutor简单实践的相关教程结束。

《Future、 CompletableFuture、ThreadPoolTaskExecutor简单实践.doc》

下载本文的Word格式文档,以方便收藏与打印。