CompletableFuture 异步编排


CompletableFuture 异步编排

目录
  • CompletableFuture 异步编排
    • 1、创建异步对象
    • 2、完成时回调
    • 3、完成时处理
    • 4、线程串行化方法
    • 5、两个任务组合(both)
      • 5.1 ps
    • 6、两个任务组合(either)
    • 7、多任务组合
      • 7.1 ps

业务场景

查询商品详情的业务比较复杂,有的数据还需要远程调用

// 获取sku的基本信息 0.5s
// 获取sku的图片信息 0.5s
// 获取sku的促销信息 1s
// 获取所有spu的销售属性 1s
// 获取规格参数组以及组下规格参数 1.5s
// spu详情 1s

假如获取商品详情页的每个查询,都需要如下标注时间来完成,服务器返回数据每次都需要5.5s,显然是不能接受的

如果多线程同时完成这6步操作,也许只需要1.5秒即可响应完成

CompletableFuture介绍

public class CompletableFuture implements Future, CompletionStage {}

我们看到继承至Future,可以获取到异步执行结果。

1、创建异步对象

CompletableFuture提供了四个静态方法来创建对象

// 异步执行,无需返回
public static CompletableFuture runAsync(Runnable runnable)
// 指定线程池,异步执行,无需返回
public static CompletableFuture runAsync(Runnable runnable,Executor executor)
// 异步执行,有返回值
public static  CompletableFuture supplyAsync(Supplier supplier)
// 指定线程池,异步执行,有返回值
public static  CompletableFuture supplyAsync(Supplier supplier,Executor executor)

代码示例

package com.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 如何创建异步对象
     * // 异步执行,无需返回
     * public static CompletableFuture runAsync(Runnable runnable)
     * // 指定线程池,异步执行,无需返回
     * public static CompletableFuture runAsync(Runnable runnable,Executor executor)
     * // 异步执行,有返回值
     * public static  CompletableFuture supplyAsync(Supplier supplier)
     * // 指定线程池,异步执行,有返回值
     * public static  CompletableFuture supplyAsync(Supplier supplier,Executor executor)
     */
    public static void main(String[] args) throws Exception {
        runAsync(); // 异步执行,无需返回值,主线程无需等待结果返回
        runAsyncWithExecutor();

        CompletableFuture future = supplyAsync(); // 异步执行,有返回值,主线程需等待结果返回
        System.out.println("supplyAsync返回结果:" + future.get());

        CompletableFuture execFuture = supplyAsyncWithExecutor();
        System.out.println("supplyAsync返回结果:" + execFuture.get());

    }

    /**
     * 异步执行,无需返回
     */
    static void runAsync() {
        System.out.println("main……start……");
        CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        });
        System.out.println("main……end……");
    }

    /**
     * 异步执行,无需返回,用线程池
     */
    static void runAsyncWithExecutor() {
        System.out.println("main……start……");
        CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }, executorService);
        System.out.println("main……end……");
    }


    /**
     * 异步执行,有返回值
     */
    static CompletableFuture supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture supplyAsyncWithExecutor() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }
}

2、完成时回调

CompletableFuture提供了四个,感知或处理结果和异常的方法

// 处理正常和异常结果,无返回值
public CompletableFuture whenComplete(BiConsumer<? super T, ? super Throwable> action)
// 另开启一个线程,处理正常和异常结果,无返回值
public CompletableFuture whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) 
// 另开启一个线程池中的线程,处理正常和异常结果,无返回值
public CompletableFuture whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
// 处理异常情况,有返回值
public CompletableFuture exceptionally(Function fn)

代码示例

package com.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);
    /**
     * 2、结果和异常处理
     * // 处理正常和异常结果
     * public CompletableFuture whenComplete(BiConsumer<? super T, ? super Throwable> action)
     * // 另开启一个线程,处理正常和异常结果
     * public CompletableFuture whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
     * // 另开启一个线程池中的线程,处理正常和异常结果
     * public CompletableFuture whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
     * // 处理异常情况
     * public CompletableFuture exceptionally(Function fn)
     */
    public static void main(String[] args) throws Exception {
        CompletableFuture whenCompleteFuture = whenComplete();
        System.out.println("whenCompleted返回结果:" + whenCompleteFuture.get());
    }

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture supplyAsyncWithExecutor() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
						// 故障制造异常,则返回结果为 10
            // int i = 10 / 0;
            // 正常,则返回结果为 5
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture whenComplete() throws Exception {
        return supplyAsyncWithExecutor()
          // 感知异常
          .whenComplete((resultData, exception) -> {
            System.out.println("执行supplyAsync后,调用whenComplete返回的数据:" + resultData + ",异常:" + exception);
            // 处理异常情况
        }).exceptionally(throwable -> 10);
    }
}

3、完成时处理

CompletableFuture提供了handle方法是另一种处理结果的方式

// 处理上一次结果,有返回值
public  CompletableFuture handle(BiFunction<? super T, Throwable, ? extends U> fn)
// 新开线程处理上一次结果,有返回值
public  CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
// 新拿取线程池中线程处理上一次结果,有返回值
public  CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

代码示例

package com.atguigu.gulimail.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 3、完成时处理
     * // 处理上一次结果,有返回值
     * public  CompletableFuture handle(BiFunction<? super T, Throwable, ? extends U> fn)
     * // 新开线程处理上一次结果,有返回值
     * public  CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
     * // 新拿取线程池中线程处理上一次结果,有返回值
     * public  CompletableFuture handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
     */
    public static void main(String[] args) throws Exception {
        CompletableFuture handleFuture = handle();
        System.out.println("handleFuture返回结果:" + handleFuture.get());

    }

  

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture supplyAsyncWithExecutor() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }

    /**
     * 异步执行,可处理返回值
     */
    static CompletableFuture handle() throws Exception {
        return supplyAsyncWithExecutor().handle((resultData, exception) -> {
            System.out.println("执行supplyAsync后,调用whenComplete返回的数据:" + resultData + ",异常:" + exception);
            if (resultData == null) {
                return resultData * 2;
            }
            if (exception != null) {
                return 0;
            }
            return resultData;
        });
    }
}

4、线程串行化方法

CompletableFuture提供了一系列的串行化方法

// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值
public CompletableFuture thenRun(Runnable action)
// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,新开线程  
public CompletableFuture thenRunAsync(Runnable action)
// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,线程池中新开线程
public CompletableFuture thenRunAsync(Runnable action,Executor executor)

// 消费一个线程结果,不返回信息
public CompletableFuture thenAccept(Consumer<? super T> action)
// 消费一个线程结果,不返回信息,新开线程
public CompletableFuture thenAcceptAsync(Consumer<? super T> action)
// 消费一个线程结果,不返回信息,线程池中新开线程
public CompletableFuture thenAcceptAsync(Consumer<? super T> action,Executor executor)

// 消费一个线程结果,返回信息
public  CompletableFuture thenApply(Function<? super T,? extends U> fn)
// 消费一个线程结果,返回信息,新开线程
public  CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
// 消费一个线程结果,返回信息,线程池中新开线程
public  CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

代码示例

package com.atguigu.gulimail.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 4、线程串行化方法
     */
    public static void main(String[] args) throws Exception {

        CompletableFuture handleFuture = handle();
        // 这些方法请自己尝试测验
        thenRun();
        thenRunAsync();
        thenRunAsyncWithExec();
        thenAccept();
        thenAcceptAsync();
        thenAcceptAsyncWithExec();
        CompletableFuture applyFuture = thenApplyAsync();
        applyFuture = thenApply();
        applyFuture = thenApplyAsyncWithExec();
        Thread.sleep(50000);
    }

   
            System.out.println("运行结果:" + i);
        }, executorService);
        System.out.println("main……end……");
    }


    /**
     * 异步执行,有返回值
     */
    static CompletableFuture supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结果:" + i);
            return i;
        });
    }

   
    /**
     * 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值(等待线程处理完之后,新开另一个线程执行其他任务)
     */
    static void thenRun() throws Exception {
        supplyAsync().thenRun(() -> {
            System.out.println("我是上一个异步操作执行完后的处理……");
        });
    }

    /**
     * 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,新开线程(等待线程处理完之后,新开另一个线程执行其他任务)
     */
    static void thenRunAsync() throws Exception {
        supplyAsync().thenRunAsync(() -> {
            System.out.println("我是上一个异步操作执行完后的处理……");
        });
    }

    /**
     * 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,线程池中新开线程(等待线程处理完之后,新开另一个线程执行其他任务)
     */
    static void thenRunAsyncWithExec() throws Exception {
        supplyAsync().thenRunAsync(() -> {
            System.out.println("我是上一个异步操作执行完后的处理……");
        }, executorService);
    }

    /**
     * 消费一个线程结果,不返回信息
     */
    static void thenAccept() throws Exception {
        supplyAsync().thenAccept(res -> {
            System.out.println("上一个线程返回的结果:" + res);
        });
    }

    /**
     * 消费一个线程结果,不返回信息,线程池中新开线程
     */
    static void thenAcceptAsync() throws Exception {
        supplyAsync().thenAcceptAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
        });
    }

    /**
     * 消费一个线程结果,不返回信息,新开线程
     */
    static void thenAcceptAsyncWithExec() throws Exception {
        supplyAsync().thenAcceptAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
        }, executorService);
    }

    /**
     * 消费一个线程结果,返回信息
     * @return
     */
    static CompletableFuture thenApply() throws Exception {
        return supplyAsync().thenApply(res -> {
            System.out.println("上一个线程返回的结果:" + res);
            return "我是一个apply";
        });
    }

    /**
     * 消费一个线程结果,返回信息,新开线程
     */
    static CompletableFuture thenApplyAsync() throws Exception {
        return supplyAsync().thenApplyAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
            return "我是一个apply";
        });
    }

    /**
     * 消费一个线程结果,返回信息,线程池中新开线程
     * @return
     */
    static CompletableFuture thenApplyAsyncWithExec() throws Exception {
        return supplyAsync().thenApplyAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
            return "我是一个apply";
        }, executorService);
    }
}

5、两个任务组合(both)

CompletableFuture提供both组合模式--两个任务必须都完成,触发改任务

// 调用者任务与参数任务执行完成后,触发action任务
public CompletableFuture runAfterBoth(CompletionStage<?> other,Runnable action)
// 调用者任务与参数任务执行完成后,新开一个线程触发action任务
public CompletableFuture runAfterBothAsync(CompletionStage<?> other,Runnable action)
// 调用者任务与参数任务执行完成后,线程池新开一个线程触发action任务
public CompletableFuture runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)

// 消费两个父任务执行结果,触发action任务,无返回值
public  CompletableFuture thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
// 消费两个父任务执行结果,新开一个线程触发action任务,无返回值
public  CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
// 消费两个父任务执行结果,线程池新开一个线程触发action任务,无返回值
public  CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)

// 处理两个父任务结果,触发子任务并返回结果
public  CompletableFuture thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
// 处理两个父任务结果,新开一个线程触发子任务并返回结果
public  CompletableFuture thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
// 处理两个父任务结果,线程池新开一个线程触发子任务并返回结果
public  CompletableFuture thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)

5.1 ps

值得注意的是,两个任务必然是并行执行的!

关于CompletionStage<?>究竟是什么呢?

我们会发现,CompletableFuture 实现了CompletionStage,也就是说,我们需要在方法里面再传一个任务,与调用者一起组成两个任务,都完成后,执行后续操作

代码示例

public static void main(String[] args) throws Exception {
        runAfterBoth();
        runAfterBothAsync();
        runAfterBothAsyncWithExec();
        
        thenAcceptBoth();
        thenAcceptBothAsync();
        thenAcceptBothAsyncWithExec();
        
        CompletableFuture thenCombineFuture = thenCombine();
        thenCombineFuture = thenCombineAsync();
        thenCombineFuture = thenCombineAsyncWithExec();
        System.out.println(thenCombineFuture.get());
        Thread.sleep(50000);
 }

		/**
     * 异步执行,有返回值
     */
    static CompletableFuture supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            try {
                System.out.println("在此等待中……");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结果:" + i);
            return i;
        });
    }

/**
 * 两个父任务执行完毕,触发action任务
 *
 * @return
 */
static void runAfterBoth() throws Exception {
    supplyAsync().runAfterBoth(supplyAsync(), () -> {
        System.out.println("runAfterBoth任务执行完成");
    });
}

/**
 * 两个父任务执行完毕,新开一个线程触发action任务
 *
 * @return
 */
static void runAfterBothAsync() throws Exception {
    supplyAsync().runAfterBothAsync(supplyAsync(), () -> {
        System.out.println("runAfterBoth任务执行完成");
    });
}

/**
 * 两个父任务执行完毕,线程池新开一个线程触发action任务
 *
 * @return
 */
static void runAfterBothAsyncWithExec() throws Exception {
    supplyAsync().runAfterBothAsync(supplyAsync(), () -> {
        System.out.println("runAfterBoth任务执行完成");
    }, executorService);
}

/**
     * 消费两个父任务执行结果,触发action任务,无返回值
     *
     * @return
     */
    static void thenAcceptBoth() throws Exception {
        supplyAsync().thenAcceptBoth(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
        });
    }

    /**
     * 消费两个父任务执行结果,新开一个线程触发action任务,无返回值
     *
     * @return
     */
    static void thenAcceptBothAsync() throws Exception {
        supplyAsync().thenAcceptBothAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
        });
    }

    /**
     * 消费两个父任务执行结果,线程池新开一个线程触发action任务,无返回值
     *
     * @return
     */
    static void thenAcceptBothAsyncWithExec() throws Exception {
        supplyAsync().thenAcceptBothAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
        }, executorService);
    }

		/**
     * 处理两个父任务结果,触发子任务并返回结果
     *
     * @return
     */
    static CompletableFuture thenCombine() throws Exception {
         return supplyAsync().thenCombine(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
            return "thenCombine";
        });
    }

    /**
     * 处理两个父任务结果,新开一个线程触发子任务并返回结果
     *
     * @return
     */
    static CompletableFuture thenCombineAsync() throws Exception {
        return supplyAsync().thenCombineAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
            return "thenCombineAsync";
        });
    }

    /**
     * 处理两个父任务结果,线程池新开一个线程触发子任务并返回结果
     *
     * @return
     */
    static CompletableFuture thenCombineAsyncWithExec() throws Exception {
        return supplyAsync().thenCombineAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
            return "thenCombineAsyncWithExec";
        }, executorService);
    }

6、两个任务组合(either)

CompletableFuture提供either组合模式--两个任务只要完成一个,触发改任务

与both有异曲同工之妙,照葫芦画瓢,这里不过多阐述

// 两个父任务结果只要返回一个,触发子任务,无返回结果
public CompletableFuture runAfterEither(CompletionStage<?> other,Runnable action)
// 两个父任务结果只要返回一个,新开一个线程触发子任务,无返回结果
public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,Runnable action)
// 两个父任务结果只要返回一个,线程池新开一个线程触发子任务,无返回结果
public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)

// 两个父任务结果只要返回一个,消费其结果,触发子任务,无返回结果
public CompletableFuture acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
// 两个父任务结果只要返回一个,消费其结果,新开一个线程触发子任务,无返回结果
public CompletableFuture acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
// 两个父任务结果只要返回一个,消费其结果,线程池新开一个线程触发子任务,无返回结果
public CompletableFuture acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
  
// 两个父任务结果只要返回一个,触发子任务处理其结果,并返回
public  CompletableFuture applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
// 两个父任务结果只要返回一个,新开线程触发子任务处理其结果,并返回
public  CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
// 两个父任务结果只要返回一个,线程池新开线程触发子任务处理其结果,并返回
public  CompletableFuture applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)

7、多任务组合

CompletableFuture提供了多任务组合模式(allOff、anyOff)

// 执行完任何一个任务后,返回其结果,有返回值
public static CompletableFuture anyOf(CompletableFuture<?>... cfs)
// 等待所有任务执行完毕,返回空值
public static CompletableFuture allOf(CompletableFuture<?>... cfs)

代码示例

public static void main(String[] args) throws Exception {
        allOf();
        CompletableFuture objectCompletableFuture = anyOf();
        System.out.println(objectCompletableFuture.get());
  			// 主线程等待运行
        Thread.sleep(10000);
    }

		/**
     * 异步执行,有返回值
     */
    static CompletableFuture supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            try {
                System.out.println("在此等待中……");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结果:" + i);
            return i;
        });
    }

		/**
     * 执行完任何一个任务后,返回其结果,有返回值
     *
     * @return
     */
    static CompletableFuture anyOf() throws Exception {
        return CompletableFuture.anyOf(supplyAsync(), supplyAsync(), supplyAsync());
    }

    /**
     * 等待所有任务执行完毕,返回空值
     *
     * @return
     */
    static CompletableFuture allOf() throws Exception {
        return CompletableFuture.allOf(supplyAsync(), supplyAsync(), supplyAsync());
    }

7.1 ps

值得注意的是,无论是anyOf还是allOf,最后所有的线程任务都会执行完毕!

总结篇:

  • run相关的方法,通常用来做下一步操作
  • accept相关的方法,通常用来消费结果,无返回值
  • supply、apply、combine相关的方法是有返回值的
  • handle方法用于处理正常和异常结果

相信大家应该都学以致用了吧!