「并发编程专题」教你如何使用异步神器CompletableFuture
来源:     阅读:503
易浩激活码
发布于 2022-03-15 14:32
查看主页

前提概要

在java8以前,我们使用java的多线程编程,一般是通过Runnable中的run方法来完成,这种方式,有个很显著的缺点,就是,没有返回值。这时候,大家可能会去尝试使用Callable中的call方法,而后用Future返回结果,如下:

public static void main(String[] args) throws Exception {        ExecutorService executor = Executors.newSingleThreadExecutor();        Future<String> stringFuture = executor.submit(new Callable<String>() {            @Override            public String call() throws Exception {                Thread.sleep(2000);                return "async thread";            }        });        Thread.sleep(1000);        System.out.println("main thread");        System.out.println(stringFuture.get());}

1.很多个异步线程执行时间可能不一致,我的主线程业务不能一直等着,这时候我可能会想要只等最快的线程执行完或者者最重要的那个任务执行完,亦或者者我只等1秒钟,至于没返回结果的线程我就用默认值代替.

2.我两个异步任务之间执行独立,但是第二个依赖第一个的执行结果.

java8的CompletableFuture,就在这混乱且不完美的多线程江湖中闪亮登场了.CompletableFuture让Future的功能和使用场景得到极大的完善和扩展,提供了函数式编程能力,使代码更加美观优雅,而且可以通过回调的方式计算解决结果,对异常解决也有了更好的解决手段.

CompletableFuture源码中有四个静态方法用来执行异步任务:

创立任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}public static CompletableFuture<Void> runAsync(Runnable runnable){..}public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}

执行异步任务的方式也很简单,只要要使用上述方法即可以了:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {    //....执行任务    return "hello";}, executor)

接下来看一下获取执行结果的几个方法。

V get();V get(long timeout,Timeout unit);T getNow(T defaultValue);T join();

接下来以少量场景的实例来详情一下CompletableFuture中其余少量常用的方法

thenAccept()public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任务A");CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "任务B");CompletableFuture<String> futureC = futureB.thenApply(b -> {      System.out.println("执行任务C.");      System.out.println("参数:" + b);//参数:任务B      return "a";});thenRun(..)public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任务A");futureA.thenRun(() -> System.out.println("执行任务B"));thenApply(..)public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");CompletableFuture<String> futureB = futureA.thenApply(s->s + " world");CompletableFuture<String> future3 = futureB.thenApply(String::toUpperCase);System.out.println(future3.join());

上面的代码,我们当然可以先调用future.join()先得到任务A的返回值,而后再拿返回值做入参去执行任务B,而thenApply的存在就在于帮我简化了这一步,我们不必由于等待一个计算完成而一直阻塞着调用线程,而是告诉CompletableFuture你啥时候执行完就啥时候进行下一步. 就把多个任务串联起来了.

thenCombine(..)  thenAcceptBoth(..)  runAfterBoth(..)public <U,V> CompletableFuture<V>  thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)public <U,V> CompletableFuture<V>  thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)public <U,V> CompletableFuture<V>  thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> 100d); CompletableFuture<Double> futureDiscount = CompletableFuture.supplyAsync(() -> 0.8); CompletableFuture<Double> futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount); System.out.println("最终价格为:" + futureResult.join()); //最终价格为:80.0
thenCompose(..)public <U> CompletableFuture<U>  thenCompose(Function<? super T,? extends CompletionStage<U>> fn)public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

这个方法和thenApply非常像,都是接受上一个任务的结果作为入参,执行自己的操作,而后返回.那具体有什么区别呢?

 CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");CompletableFuture<String> futureB = futureA.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world"));CompletableFuture<String> future3 = futureB.thenCompose(s -> CompletableFuture.supplyAsync(s::toUpperCase));System.out.println(future3.join());
applyToEither(..)  acceptEither(..)  runAfterEither(..)public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "通过方式A获取商品a";        });CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "通过方式B获取商品a";        });CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "结果:" + product);System.out.println(futureC.join()); //结果:通过方式A获取商品a

同样的道理,applyToEither的兄弟方法还有acceptEither(),runAfterEither(),我想不需要我解释你也知道该怎样用了.

exceptionally(..)public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
CompletableFuture<String> futureA = CompletableFuture.                supplyAsync(() -> "执行结果:" + (100 / 0))                .thenApply(s -> "futureA result:" + s)                .exceptionally(e -> {                    System.out.println(e.getMessage()); //java.lang.ArithmeticException: / by zero                    return "futureA result: 100";                });CompletableFuture<String> futureB = CompletableFuture.                supplyAsync(() -> "执行结果:" + 50)                .thenApply(s -> "futureB result:" + s)                .exceptionally(e -> "futureB result: 100");System.out.println(futureA.join());//futureA result: 100System.out.println(futureB.join());//futureB result:执行结果:50

上面代码展现了正常流程和出现异常的情况,可以了解成catch,根据返回值可以体会下.

whenComplete(..)public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

功能:当CompletableFuture的计算结果完成,或者者抛出异常的时候,都可以进入whenComplete方法执行,举个栗子

CompletableFuture<String> futureA = CompletableFuture.                supplyAsync(() -> "执行结果:" + (100 / 0))                .thenApply(s -> "apply result:" + s)                .whenComplete((s, e) -> {                    if (s != null) {                        System.out.println(s);//未执行                    }                    if (e == null) {                        System.out.println(s);//未执行                    } else {                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero                    }                })                .exceptionally(e -> {                    System.out.println("ex"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero             return "futureA result: 100"; }); System.out.println(futureA.join());//futureA result: 100

根据控制台,我们可以看出执行流程是这样,supplyAsync->whenComplete->exceptionally,可以看出并没有进入thenApply执行,起因也显而易见,在supplyAsync中出现了异常,thenApply只有当正常返回时才会去执行.而whenComplete不论能否正常执行,还要注意一点,whenComplete是没有返回值的.

上面代码我们使用了函数式的编程风格并且先调用whenComplete再调用exceptionally,假如我们先调用exceptionally,再调用whenComplete会发生什么呢,我们看一下:

CompletableFuturefutureA = CompletableFuture.supplyAsync(() -> "执行结果:" + (100 / 0)).thenApply(s -> "apply result:" + s).exceptionally(e -> {System.out.println("ex:"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zeroreturn "futureA result: 100";}).whenComplete((s, e) -> {if (e == null) {System.out.println(s);//futureA result: 100} else {System.out.println(e.getMessage());//未执行}});System.out.println(futureA.join());//futureA result: 100

代码先执行了exceptionally后执行whenComplete,可以发现,因为在exceptionally中对异常进行了解决,并返回了默认值,whenComplete中接收到的结果是一个正常的结果,被exceptionally美化过的结果,这一点需要留意一下.

handle(..)public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
CompletableFuture<String> futureA = CompletableFuture.                supplyAsync(() -> "执行结果:" + (100 / 0))                .thenApply(s -> "apply result:" + s)                .exceptionally(e -> {                    System.out.println("ex:" + e.getMessage()); //java.lang.ArithmeticException: / by zero                    return "futureA result: 100";                })                .handle((s, e) -> {                    if (e == null) {                        System.out.println(s);//futureA result: 100                    } else {                        System.out.println(e.getMessage());//未执行                    }                    return "handle result:" + (s == null ? "500" : s);                });System.out.println(futureA.join());//handle result:futureA result: 100

通过控制台,我们可以看出,最后打印的是handle result:futureA result: 100,执行exceptionally后对异常进行了"美化",返回了默认值,那么handle得到的就是一个正常的返回,我们再试下,先调用handle再调用exceptionally的情况.

CompletableFuture<String> futureA = CompletableFuture.                supplyAsync(() -> "执行结果:" + (100 / 0))                .thenApply(s -> "apply result:" + s)                .handle((s, e) -> {                    if (e == null) {                        System.out.println(s);//未执行                    } else {                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero                    }                    return "handle result:" + (s == null ? "500" : s);                })                .exceptionally(e -> {                    System.out.println("ex:" + e.getMessage()); //未执行                    return "futureA result: 100";                });System.out.println(futureA.join());//handle result:500

根据控制台输出,可以看到先执行handle,打印了异常信息,并对接过设置了默认值500,exceptionally并没有执行,由于它得到的是handle返回给它的值,由此我们大概推测handle和whenComplete的区别

1.都是对结果进行解决,handle有返回值,whenComplete没有返回值
2.因为1的存在,使得handle多了一个特性,可在handle里实现exceptionally的功能

allOf(..)  anyOf(..)public static CompletableFuture<Void>  allOf(CompletableFuture<?>... cfs)public static CompletableFuture<Object>  anyOf(CompletableFuture<?>... cfs)
public static void main(String[] args) throws Exception {        ExecutorService executorService = Executors.newFixedThreadPool(4);        long start = System.currentTimeMillis();        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000 + RandomUtils.nextInt(1000));            } catch (InterruptedException e) {                e.printStackTrace();            }            return "商品介绍";        },executorService);        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000 + RandomUtils.nextInt(1000));            } catch (InterruptedException e) {                e.printStackTrace();            }            return "卖家信息";        },executorService);        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000 + RandomUtils.nextInt(1000));            } catch (InterruptedException e) {                e.printStackTrace();            }            return "库存信息";        },executorService);        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000 + RandomUtils.nextInt(1000));            } catch (InterruptedException e) {                e.printStackTrace();            }            return "订单信息";        },executorService);        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB, futureC, futureD);        allFuture.join();        System.out.println(futureA.join() + futureB.join() + futureC.join() + futureD.join());        System.out.println("总耗时:" + (System.currentTimeMillis() - start));    }
免责声明:本文为用户发表,不代表网站立场,仅供参考,不构成引导等用途。 系统环境 软件环境
相关推荐
hexo个人网站优化探究
项目经理:在微信公众号中引入支付宝支付| 程序员猝
按下电源键后发生了什么 —— Android 系统启动流程分析
SpringBoot启动类的代码底层原理第一步
LNMP配置实例(含MySQL命令补全、nginx增加未编译板块)
首页
搜索
订单
购物车
我的