跳至主要內容

CompletableFuture

Jin大约 18 分钟

CompletableFuture

1、Future和Callable接口

  1. Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

  2. Callable接口中定义了需要有返回的任务需要实现的方法。

  3. 比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。

2、FutureTask

本源的Future接口相关架构

image-20220821155043721
image-20220821155043721

get()阻塞

    @Test
    public void demo01() throws InterruptedException, ExecutionException, TimeoutException {
        FutureTask<Integer> futureTask = new FutureTask<>(() -> {
            System.out.println("-----come in FutureTask");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ThreadLocalRandom.current().nextInt(100);
        });

        Thread t1 = new Thread(futureTask, "t1");
        t1.start();

        //3秒钟后才出来结果,还没有计算你提前来拿(只要一调用get方法,对于结果就是不见不散,会导致阻塞)
//        System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get());

        //3秒钟后才出来结果,我只想等待1秒钟,过时不候
        System.out.println(Thread.currentThread().getName() + "\t" + futureTask.get(1L, TimeUnit.SECONDS));

        System.out.println(Thread.currentThread().getName() + "\t" + " run... here");
    }

打印结果

-----come in FutureTask

java.util.concurrent.TimeoutException

一旦调用get()方法,不管是否计算完成都会导致阻塞,o(╥﹏╥)o

isDone()轮询

  • 轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果.
  • 如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞
    @Test
    public void demo02() throws InterruptedException, ExecutionException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("-----come in FutureTask");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "" + ThreadLocalRandom.current().nextInt(100);
        });

        new Thread(futureTask, "t1").start();

        System.out.println(Thread.currentThread().getName() + "\t" + "线程完成任务");

        /**
         * 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
         */
        while (true) {
            if (futureTask.isDone()) {
                System.out.println(futureTask.get());
                break;
            }
        }
    }

打印结果

main	线程完成任务
-----come in FutureTask
73

小总结

  1. 不见不散,过时不候,轮询
  2. 想完成一些复杂的任务
    • 应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
    • 将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果。
    • 当Future集合中某个任务最快结束时,返回结果。等待Future结合中的所有任务都完成。

3、CompletableFuture

  • 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

类架构说明

image-20220821160843714
image-20220821160843714

3.1、CompletionStage接口

image-20220821161012228
image-20220821161012228
  • 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。

  • 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.then Apply(x->square(x) ) .then Accept

  • (x->System.out.print(x) ) .then Run() ->System.out.print In() )

  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

3.2、CompletableFuture类

image-20220821161115680
image-20220821161115680
  • 在Java 8中, Complet able Future提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能 力, 可以通过回调的方式处理计算结果, 也提供了转换和组合Complet able Future的方法。

  • 它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些 动作。

  • 它实现了Future和Completion Stage接口

4、CompletableFuture核心的四个静态方法

  • 利用核心的四个静态方法创建一个异步操作 | 不建议用new

  • 关键就是 |有没有返回值|是否用了线程池|

  • 参数说明:

    • 没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。

    • 如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。

4.1、runAsync 无 返回值

    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }

    public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

4.2、supplyAsync 有 返回值

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

4.3、上述Executor executor参数说明

  1. 没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
  2. 如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

4.4、Code

runAsync


    /**
     * runAsync
     * 无返回值
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void demo01() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");//ForkJoinPool.commonPool-worker-9	-----come in
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-----task is over");//-----task is over
        });
        System.out.println(future.get());//null
    }

    /**
     * runAsync 结合线程池
     * 无返回值
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void demo02() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");//pool-1-thread-1	-----come in
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-----task is over");//-----task is over
        }, executorService);
        System.out.println(future.get());//null

        executorService.shutdown();
    }

supplyAsync

    /**
     * supplyAsync 有返回值
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void demo03() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");//ForkJoinPool.commonPool-worker-9	-----come in
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ThreadLocalRandom.current().nextInt(100);
        });

        System.out.println(completableFuture.get());//5
    }

    /**
     * supplyAsync 有返回值
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void demo04() throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");//pool-1-thread-1	-----come in
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ThreadLocalRandom.current().nextInt(100);
        }, executorService);

        System.out.println(completableFuture.get());//83
        executorService.shutdown();

    }

whenComplete和exceptionally

从Java8开始引入了CompletableFuture,它是Future的功能增强版,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

  • 使用默认线程池

    /**
     * whenComplete
     * 从Java8开始引入了CompletableFuture,它是Future的功能增强版。
     * 可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
     */
    @Test
    public void demo05() {

        CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");//ForkJoinPool.commonPool-worker-9	-----come in
            int result = ThreadLocalRandom.current().nextInt(10);
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-----计算结束耗时1秒钟,result: " + result);//-----计算结束耗时1秒钟,result: 7
            if (result > 6) {
                int age = 10 / 0;
            }
            return result;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("-----result: " + v);
            }
        }).exceptionally(e -> {
            System.out.println("-----exception: " + e.getCause() + "\t" + e.getMessage());//-----exception: java.lang.ArithmeticException: / by zero	java.lang.ArithmeticException: / by zero
            return -44;
        });

        //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 使用自定义线程池
@Test
    public void demo06() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        try {
            CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");//pool-1-thread-1	-----come in
                int result = ThreadLocalRandom.current().nextInt(10);
                //暂停几秒钟线程
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("-----计算结束耗时1秒钟,result: " + result);//-----计算结束耗时1秒钟,result: 1
                if (result > 6) {
                    int age = 10 / 0;
                }
                return result;
            }, executorService).whenComplete((v, e) -> {
                if (e == null) {
                    System.out.println("-----result: " + v);//-----result: 1
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("-----exception: " + e.getCause() + "\t" + e.getMessage());
                return -44;
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            executorService.shutdown();
        }

        //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

CompletableFuture的优点

  1. 异步任务结束时,会自动回调某个对象的方法;
  2. 异步任务出错时,会自动回调某个对象的方法;
  3. 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行

5、电商比价案例

5.1、函数式编程已经主流

Lambda +Stream+链式调用+Java8函数式编程

  1. Runnable
    • image-20220821164331393
      image-20220821164331393
  2. Function
    • image-20220821164349674
      image-20220821164349674
  3. Consumer
    • image-20220821164359454
      image-20220821164359454
  4. Supplier
    • image-20220821164407186
      image-20220821164407186
  5. BiConsumer
    • image-20220821164416574
      image-20220821164416574
  6. 小总结
    • image-20220821164454326
      image-20220821164454326
函数式接口名称方法名称参数返回值
Runnablerun无参数无返回值
Functionapply1个参数有返回值
Consumeaccept1个参数无返回值
Supplierget没有参数有返回值
Biconsumeraccept2个参数无返回值

5.2、join和get对比

  • 功能几乎一样,区别在于编码时是否需要抛出异常
    • get()方法需要抛出异常
    • join()方法不需要抛出异常
public class Chain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello 12345";
        });
        System.out.println(completableFuture.get());
    }

}

public class Chain {
    public static void main(String[] args)  {//未抛出异常
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello 12345";
        });
        System.out.println(completableFuture.join());
    }
}

5.3、大厂业务需求说明

  1. 经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。

  2. 我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。同理, 对于分布式微服务的调用,按照实际业务,如果是无关联step by step的业务,可以尝试是否可以多箭齐发,同时调用。

  3. 我们去比同一个商品在各个平台上的价格,要求获得一个清单列表,

    • step by step,查完京东查淘宝,查完淘宝查天猫......
    • all 一口气同时查询。。。。。

5.4、一波流Java8函数式编程带走

/**
 * 案例说明:电商比价需求,模拟如下情况:
 * <p>
 * 1需求:
 * 1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
 * 1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
 * <p>
 * 2输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
 * 《mysql》 in jd price is 88.05
 * 《mysql》 in dangdang price is 86.11
 * 《mysql》 in taobao price is 90.43
 * <p>
 * 3 技术要求
 * 3.1 函数式编程
 * 3.2 链式编程
 * 3.3 Stream流式计算
 */
public class CompletableFutureDemo2 {

    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("tmall"),
            new NetMall("pdd"),
            new NetMall("mi")
    );

    /**
     * step by step 一家家搜查
     * List<NetMall> ----->map------> List<String>
     *
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPrice(List<NetMall> list, String productName) {
        //《mysql》 in taobao price is 90.43
        return list.stream().map(netMall ->
                        String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }

    /**
     * List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
     *
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {
        return list.stream().map(netMall ->
                        CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }


    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");

        System.out.println("--------------------");

        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(list, "mysql");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime2 - startTime2) + " 毫秒");
    }
}

class NetMall {
    @Getter
    private String netMallName;

    public NetMall(String netMallName) {
        this.netMallName = netMallName;
    }

    public double calcPrice(String productName) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

打印结果

mysql in jd price is 109.80
mysql in tmall price is 110.16
mysql in pdd price is 110.99
mysql in mi price is 109.56
----costTime: 4113 毫秒
--------------------
mysql in jd price is 109.78
mysql in tmall price is 110.86
mysql in pdd price is 110.32
mysql in mi price is 109.86
----costTime: 1015 毫秒

6、CompletableFuture常用方法

6.1、获得结果和触发计算

获取结果

  • public T get() 不见不散,容易阻塞

  • public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常

  • public T join() 类似于get(),区别在于是否需要抛出异常

  • public T getNow(T valueIfAbsent)

    • 没有计算完成的情况下,给一个替代结果

    • 立即获取结果不阻塞

      • 计算完,返回计算完成后的结果

      • 没算完,返回设定的valueAbsent(直接返回了备胎值xxx)

public T get()

不见不散

    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }

public T get(long timeout, TimeUnit unit)

过时不候,超过时间会抛异常【java.util.concurrent.TimeoutException

    public T get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        Object r;
        long nanos = unit.toNanos(timeout);
        return reportGet((r = result) == null ? timedGet(nanos) : r);
    }

public T getNow(T valueIfAbsent)

  1. 没有计算完成的情况下,给我一个替代结果
  2. 立即获取结果不阻塞
    • 计算完,返回计算完成后的结果
    • 没算完,返回设定的valueIfAbsent值
    /**
     * 获得结果和触发计算
     * <p>
     * 1.不见不散
     * public T get()
     * <p>
     * 2. 过时不候
     * public T get(long timeout, TimeUnit unit)
     * <p>
     * public T join()
     * <p>
     * 3.没有计算完成的情况下,给我一个替代结果
     * // 立即获取结果不阻塞 计算完,返回计算完成后的结果  没算完,返回设定的valueIfAbsent值
     * public T getNow(T valueIfAbsent)
     * <p>
     * 4.是否打断get方法立即返回括号值
     * public boolean complete(T value)
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void demo01() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 533;
        });

        //去掉注释上面计算没有完成,返回444
        //开启注释上满计算完成,返回计算结果
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(completableFuture.get());//533
//        System.out.println(completableFuture.join());//533

//        没有计算完成的情况下,给我一个替代结果
//        立即获取结果不阻塞 计算完,返回计算完成后的结果  没算完,返回设定的valueIfAbsent值
        System.out.println(completableFuture.getNow(444));//533


//        join
        System.out.println(CompletableFuture.supplyAsync(() -> "Jin").thenApply(r -> r + "666").join());//Jin666
    }

主动触发计算

public boolean complete(T value)

是否打断get方法立即返回括号值

    @Test
    public void demo02() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 533;
        });

        //注释掉暂停线程,get还没有算完只能返回complete方法设置的444;暂停2秒钟线程,异步线程能够计算完成返回get
        try {
            TimeUnit.SECONDS.sleep(1);
//            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.
        //执行完成返回false,未执行完成返回true,并返回默认值
        //返回false,调用get方法返回533。返回true,调用get方法返回444
        boolean complete = completableFuture.complete(444);
        System.out.println(complete);
        System.out.println(complete + "\t" + completableFuture.get());
    }

6.2、对计算结果进行处理

thenApply

计算结果存在依赖关系,这两个线程串行化

由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。


    /**
     * thenApply:对计算结果进行处理
     */
    @Test
    public void demo03() {
        //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
        CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1024;
        }).thenApply(f -> {
            System.out.println("222");
            return f + 1;
        }).thenApply(f -> {
            //int age = 10/0; // 异常情况:那步出错就停在那步。
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v, e) -> {
            System.out.println("*****v: " + v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });

        System.out.println("-----主线程结束,END");

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("结果值为:" + exceptionally.join());
    }

打印结果

-----主线程结束,END
111
222
333
*****v: 1026
结果值为:1026

handle

有异常也可以往下一步走,根据带的异常参数可以进一步处理

    /**
     * handle 有异常也可以往下一步走,根据带的异常参数可以进一步处理
     */
    @Test
    public void demo04() {
        //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
        //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
        // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1024;
        }).handle((f, e) -> {
            System.out.println("222");
            int age = 10 / 0;
            System.out.println("2222");
            return f + 1;
        }).handle((f, e) -> {
            System.out.println("333");
            return f + 1;
        }).handle((f, e) -> {
            System.out.println("444");
            return f + 1;
        }).whenCompleteAsync((v, e) -> {
            System.out.println("*****v: " + v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });

        System.out.println("-----主线程结束,END");

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

打印结果

-----主线程结束,END
111
222
333
444
*****v: null
java.util.concurrent.CompletionException
image-20220821200028405
image-20220821200028405
image-20220821200036195
image-20220821200036195

6.3、对计算结果进行消费

接收任务的处理结果,并消费处理,无返回结果

方法名方法释义
thenRunthenRun(Runnable runnable)任务 A 执行完执行 B,并且 B 不需要 A 的结果
thenAcceptthenAccept(Consumer action)任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值
thenApply(Function fn)thenApply(Function fn)任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
    /**
     * thenRun,thenAccept 对计算结果进行消费不返回
     */
    @Test
    public void demo05() {
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {
        }).join());//null
        System.out.println("------------------------------");
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {
        }).join());//null
        System.out.println("------------------------------");
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());//resultA resultB
    }

6.4、对计算速度进行选用

谁快用谁:applyToEither

    /**
     * applyToEither:对计算速度进行选用
     */
    @Test
    public void demo07() {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "completableFuture1---come in ");//ForkJoinPool.commonPool-worker-9	completableFuture1---come in
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "completableFuture2---come in ");//ForkJoinPool.commonPool-worker-2	completableFuture2---come in
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 20;
        });
//      比较 哪个现场快,返回哪个线程
        CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2, f -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");//ForkJoinPool.commonPool-worker-2	---come in
            System.out.println("执行速度快的线程返回值:" + f);//执行速度快的线程返回值:20
            return f + 1;
        });

        System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.join());//main	21
    }

6.5、对计算结果进行合并

  1. 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine 来处理

  2. 先完成的先等着,等待其它分支任务

  3. thenCombine

    • code标准版,好理解先拆分

      •     /**
             * thenCombine: 对计算结果进行合并
             */
            @Test
            public void demo09() {
                CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");//ForkJoinPool.commonPool-worker-9	---come in
                    return 10;
                });
        
                CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");//ForkJoinPool.commonPool-worker-9	---come in
                    return 20;
                });
        
                CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");//main	---come in
                    return x + y;
                });
                System.out.println(thenCombineResult.join());//30
            }
        
    • code表达式

      •     /**
             * thenCombine: 对计算结果进行合并
             */
            @Test
            public void demo10() {
                CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");//ForkJoinPool.commonPool-worker-9	---come in 1
                    return 10;
                }).thenCombine(CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");//ForkJoinPool.commonPool-worker-9	---come in 2
                    return 20;
                }), (x, y) -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");//main	---come in 3
                    return x + y;
                }).thenCombine(CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");//ForkJoinPool.commonPool-worker-9	---come in 4
                    return 30;
                }), (a, b) -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");//main	---come in 5
                    return a + b;
                });
                System.out.println("-----主线程结束,END");//-----主线程结束,END
                System.out.println(thenCombineResult.join());//60
        
        
                // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        

7.CompleteFuture和线程池说明(非常重要)

  • 上面的几个方法都有普通版本和后面加Async的版本

  • 以thenRun和thenRunAsync为例,有什么区别?

  • 先看结论

    1. 没有传入自定义线程池,都用默认线程池ForkJoinPool

    2. 传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池

      1. 调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池

      2. 调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

    3. 也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)

案例一:自定义线程池+thenRun

//2-1
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}
//1号任务  pool-1-thread-1
//2号任务  pool-1-thread-1
//3号任务  pool-1-thread-1
//4号任务  pool-1-thread-1

案例二:自定义线程池+thenRunAsync

//2-2
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRunAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}
//1号任务  pool-1-thread-1
//2号任务  ForkJoinPool.commonPool-worker-9---这里另起炉灶重新调用了默认的ForkJoinPool
//3号任务  ForkJoinPool.commonPool-worker-9
//4号任务  ForkJoinPool.commonPool-worker-9

案例三:自定义线程池+thenRunAsync

public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
//            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRun(()->{
           // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
          //  try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            //try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}
//1号任务  1号任务  pool-1-thread-1
//2号任务  main
//3号任务  main
//4号任务  main

源码

//CompletableFuture.java 2009行
public CompletableFuture<Void> thenRun(Runnable action) {//传入值是一样的
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);//但是这里有个异步的线程池asyncPool
    }
    //进入asyncPool
    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);//一般大于1都是成立的

    /**
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
     * support parallelism.
     */
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();//所以这里会调用forkJoin线程池
贡献者: Jin