开发者

Java中的CompletableFuture异步编程详解

目录
  • 场景引入
  • CompletableFuture初体验
  • 创建CompletableFuture对象
  • CompletableFuture类定义
  • CompletionStage接口
    • AND汇聚关系
    • OR聚合关系
    • 串行关系
    • 异常关系

场景引入

只要提到多线程来优化性能,那么必定离不开异步化,异步化的出现才是多线程优化性能这个核心方案的基础。 异步化其实我们早已接触,如下Thread类,主线程不需要等待线程T1,T2的执行结果,就能实现异步逻辑。

public static void main(String[] args) {
    Thread T1 = new Thread(()->{
        // 执行方法A逻辑
    });
    Thread T2 = new Thread(()->{
        // 执行方法B逻辑
    });
    // 省略其它逻辑
}

这种方案虽然可编程客栈行,但是在开发中明显不是最优,现实生产业务对应各种各样的需求,简单的Thread已经不满足需求,所以Java在1.8版本提出CompletableFuture工具类来解决生产中遇到的异步化问题。

CompletableFuture初体验

先从之前提到的华罗庚提出的最优泡茶问题入手,简易体验下CompletableFuture工具类的优势。 最优泡茶问题可以分为如下步骤

Java中的CompletableFuture异步编程详解

在用FutureTask实现时是将上述步骤拆分为两个线程执行,如下所示

Java中的CompletableFuture异步编程详解

通过代码实现明显能感觉到,线程需要手动维护,代码逻辑复杂,不能专注于业务代码。

为了方便采用CompletableFuture实现,将最优泡茶方案进一步细分,如下所示,将流程拆分三个线程执行,线程F3需要等待线程F1,F2都返回后才执行。

Java中的CompletableFuture异步编程详解

代码实现如下

public static void main(String[] args) throws Exception {
    // 无返回值的异步调用
    CompletableFuture f1 = CompletableFuture.runAsync(()->{
        System.out.println("F1 洗水壶");
        sleep(1);
        System.out.println("F1 烧水");
        sleep(15);
    });
    // 有返回值的实例化
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("F2 洗茶壶");
        sleep(1);
        System.out.println("F2 洗茶杯");
        sleep(2);
        System.out.println("F2 拿茶叶");
        sleep(1);
        return "龙井";
    });
    // f3等待f1和f2到达后才能执行
    // param1是f1的返回值,这里没有就是空  param2是f2的返回值
    CompletableFuture f3 = f1.thenCombine(f2, (param1, param2) -> {
        System.out.println("F3 拿到茶叶:" + param2);
        System.out.println("F3 泡茶");
        return param2;
    });
    // 阻塞等待
    f3.get();
}
public static void sleep(int t){
    try {
        TimeUnit.SECONDS.sleep(t);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

返回结果

Java中的CompletableFuture异步编程详解

从实现代码来看CompletableFuture有如下优势

  • 不需要手动维护线程,不需要手动给任务分配工作线程。
  • 代码简练可以专注业务逻辑。

创建CompletableFuture对象

创建CompletableFuture除了初体验CompletableFuture代码中的两种还有两种,总共四种方法签名如下

// 有返回值   supplier供给型接口(不进有出)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 有返回值,任务使用自定义的线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
// 无返回值  Runnable接口执行run方法无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable);
// 无返回值,任务使用自定义的线程池
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

注意:在创建CompletableFuture对象时尽量使用自js定义的线程池,因为默认是采用公共的ForkJoinPool线程池,如果某个CompletableFuture中有I/O操作非常耗时的就会阻塞该线程池中所有的线程,导致线程饥饿的风险,进而影响整个系统的性能,生产中最好按照业务创建不同类型的线程池,互不干扰。

CompletableFuture类定义

工具类CompletableFuture类定义如下

 public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

由于CompletableFuture类实现了Future接口,所以异步线程的两大问题,线程什么时候执行完毕?线程的返回值是什么?都可以利用Future接口的特性解决,

另外CompletableFuture也实现了CompletionStage接口,那CompletionStage又是什么呢?

CompletionStage接口

CompletionStage接口是去描述任务之间的时序关系,包括前面提到的f1.thenCombine(f2, (param1, param2) -> {})就是一种典型的AND聚合关系,还能描述OR聚合关系,串行关系,以及异步编程中的异常处理关系。

AND汇聚关系

AND汇聚关系即表示依赖任务全部执行完毕才能执行当前任务,在CompletableFuture初体验中有使用,可以参考。

CompletionStage接口描述汇聚关系的方法签名如下:

public CompletionStage<V> thenCombine(CompletionStage other,BiFunction fn);
public CompletionStage<V> thenCombineAsync(CompletionStage other,BiFunction fn);
public CompletionStage<V> thenCombineAsync(CompletionStage other,BiFunction fn,Executor executor);
public CompletionStage<Void> thenAcceptBoth(CompletionStage other,BiConsumer consumer);
public CompletionStage<Void> thenAcceptBothAsync(CompletionStage other,BiConsumer consumer);
public CompletionStage<Void> thenAcceptBothAsync(CompletionStage other,BiConsumer consumer,Executor executor);
public CompletionStage<Void> runAfterBoth(CompletionStage other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage other,Runnable action,Executor executor);

thenCombine、thenAcceptBoth、runAfterBoth这三个系列方法的区别源自核心参数区别

  • BiFunction fn参数是函数式接口,这个参数既能支持入参也支持返回值。
  • BiConsumer consumer参数是消费型接口,只有入参没有返回值。
  • Runnable action 参数不支持入参也不支持出参。
  • Executor executor 表示指定线程池,不使用公共的ForkJoin线程池。
  • 其中Async表示异步执行fn、consumer、action逻辑。

OR聚合关系

OR聚合关系表示当其中一个依赖任务执行完毕就可以执行当前任务。

CompletionStage接口描述聚合关系的方法签名如下:

public CompletionStage<U> applyToEither(CompletionStage other,Function fn);
public CompletionStage<U> applyToEitherAsync(ComplphpetionStage other,Function fn);
public CompletionStage<U> applyToEitherAsync(CompletionStage other,Function fn,Executor executor);
public CompletionStage<Void> acceptEither(CompletionStage other,Consumer action);
public CompletionStage<Void> acceptEit编程客栈herAsync(CompletionStage other,Consumer action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage other,Consumer action,Executor executor);
public CompletionStage<Void> runAfterEither(CompletionStage other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage other,Runnable action,Executor executor)

这三个系列方法的区别也是源自核心参数区别。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
        double num = Math.random();
        System.out.println("f1 返回值:"+num);
        // TimeUnit.SECONDS.sleep(1);
        return num;
    });
    CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{
        double num = Math.random();
        System.out.println("f2 返回值:"+num);
        return num;
    });
    CompletableFuture<Void> f3 = f1.acceptEither(f2, (num) -> {
        System.out.println("最后返回值:" + num);
    });
    // 不是需要返回值,而是阻塞等待f3执行结束
    // 返回结果就是任务f1,f2中的任意一个返回值,需要保证f1,f2的返回值类型相同
    f3.get();
}

串行关系

串行关系表示依赖任务按照编写顺序先后执行。

CompletionStage接口描述串行关系的方法签名如下:

public <U> CompletionStage<U> thenApply(Function fn);
public <U> CompletionStage<U> thenApplyAsync(Function fn);
public <U> CompletionStage<U> thenApplyAsync(Function fn,Executor executor);
public CompletionStage<Void> thenAccept(Consume action);
public CompletionStage<Void> thenAcceptAsync(Consumer action);
public CompletionStage<Void> thenAcceptAsync(Consumer action,Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStagQaXNAQe<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);

thenApply、thenAccept、thenRun这三个系列方法的区别也是源自核心参数区别。

需要注意的是thenCompose系列方法,这个方法会新创建出一个子流程,最终结果和thenApply系列方法相同。

如下所示,从上往下依次执行。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
        return "hello world";
    }).thenApply((param1)->{
        return param1 + " CompletableFuture";
    }).thenAccept((param2)->{
        param2 = param2.toUpperCase();
        System.out.println(param2);
    });
    completableFuture.get();
}

Java中的CompletableFuture异步编程详解

异常关系

在串行关系、OR聚合关系、AND汇聚关系中,由于其核心方法参数fn、consumer、action都不允许抛出异常,但是都无法限制它们抛出异常,如下所示

public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> f0 = CompletableFuture
        .supplyAsync(()->(7/0))
        .thenApply(r->r*10);
    System.out.println(f0.get());
}

非异步编程可以采用try{}catch{}捕获那么异步编程如何处理呢?

CompletionStage接口支持异常处理方法签名如下

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
  • exceptionally使用类似try{}catch{}中的 catch{}。
  • whenComplete系列使用类似try{}finally{}中的 finally{},无论是否发生异常都会执行whenComplete中的逻辑代码,但是无返回结果。
  • handle系列使用和whenComplete一样,不过handle支持返回结果

测试代码如下

public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> f0 = CompletableFuture
            .supplyAsync(()->(7/0))
            .thenApply(r->r*10)
            .exceptionally((throwable)->{
                // 相当于catch完后,不抛出异常
                System.out.println("异常了:"+throwable);
                return 1;
            }).whenComplete((integer,throwable)->{
                // integer 返回值   throwable返回异常(如果exceptionally捕获返回null)
                System.out.println("返回值:"+integer);
                System.out.println("异常:"+throwable);
            }).handle((integer,throwable)->{
                // integer 返回值   throwable返回异常(如果exceptionally捕获返回null)
                System.out.println("返回值:"+integer);
                System.out.println("异常:"+throwable);
                return 2;
            });
    System.out.println(f0.get());
}

到此这篇关于Java中的CompletableFuture异步编程详解的文章就介绍到这了,更多相关CompletableFuture异步编程内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜