上一篇我们讲了Future机制,有兴趣的可以参考谈谈Future、Callable、FutureTask关系
成都创新互联公司专注于普洱网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供普洱营销型网站建设,普洱网站制作、普洱网页设计、普洱网站官网定制、微信平台小程序开发服务,打造普洱网络公司原创品牌,更为您提供普洱网站排名全网营销落地服务。
但Future机制,还不那么灵活,比如怎么去利用Future机制描述两个任务串行执行,又或是两个任务并行执行,又或是只关心最先执行结束的任务结果。
Future机制在一定程度上都无法快速地满足以上需求,CompletableFuture便应运而生了。
本片会介绍CompletableFuture的api,并用一些示例演示如何去使用。
- public static CompletableFuture supplyAsync(Supplier supplier)
- public static CompletableFuture supplyAsync(Supplier supplier,Executor executor);
- public static CompletableFuture
runAsync(Runnable runnable); - public static CompletableFuture
runAsync(Runnable runnable,Executor executor);
supplyAsync与runAsync的区别在于:supplyAsync有返回值,而runAsync没有返回值
带Executor参数的构造函数,则使用线程池中的线程执行异步任务(线程池可以参考说说线程池)
不带Executor参数的构造函数,则使用ForkJoinPool.commonPool()中的线程执行异步任务(Fork/Join框架可以参考谈谈并行流parallelStream)
- public class Case1 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture=CompletableFuture.supplyAsync(()->{ - try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 1;
- });
- //该方法会一直阻塞
- Integer result = completableFuture.get();
- System.out.println(result);
- }
- }
- public CompletableFuture
whenComplete(BiConsumer super T, ? super Throwable> action); - public CompletableFuture
whenCompleteAsync(BiConsumer super T, ? super Throwable> action); - public CompletableFuture
whenCompleteAsync(BiConsumer super T, ? super Throwable> action, Executor executor); - public CompletableFuture
exceptionally(Function fn);
whenComplete开头的方法在计算任务完成(包括正常完成与出现异常)之后会回调
而exceptionally则只会在计算任务出现异常时才会被回调
如何确定哪个线程去回调whenComplete,比较复杂,先略过。
而回调whenCompleteAsync的线程比较简单,随便拿一个空闲的线程即可,后缀是Async的方法同理。
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.function.BiConsumer;
- import java.util.function.Function;
- import java.util.stream.IntStream;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case2 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("执行supplyAsync的线程:" + Thread.currentThread().getName());
- int i = 1 / 0;
- return 1;
- });
- completableFuture.whenComplete(new BiConsumer
() { - @Override
- public void accept(Integer integer, Throwable throwable) {
- System.out.println("执行whenComplete的线程:" + Thread.currentThread().getName());
- if (throwable == null) {
- System.out.println("计算未出现异常,结果:" + integer);
- }
- }
- });
- completableFuture.exceptionally(new Function
() { - @Override
- public Integer apply(Throwable throwable) {
- //出现异常时,则返回一个默认值
- System.out.println("计算出现异常,信息:" + throwable.getMessage());
- return -1;
- }
- });
- System.out.println(completableFuture.get());
- }
- }
输出:
当然,CompletableFuture内的各种方法是支持链式调用与Lambda表达式的,我们进行如下改写:
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("执行supplyAsync的线程:" + Thread.currentThread().getName());
- int i = 1 / 0;
- return 1;
- }).whenComplete((integer, throwable) -> {
- System.out.println("执行whenComplete的线程:" + Thread.currentThread().getName());
- if (throwable == null) {
- System.out.println("计算未出现异常,结果:" + integer);
- }
- }).exceptionally(throwable -> {
- //出现异常时,则返回一个默认值
- System.out.println("计算出现异常,信息:" + throwable.getMessage());
- return -1;
- });
- System.out.println("计算结果:" + completableFuture.get());
- }
- public CompletableFuture thenApply(Function super T,? extends U> fn);
- public CompletableFuture
thenRun(Runnable action); - public CompletableFuture
thenAccept(Consumer super T> action); - public CompletableFuture handle(BiFunction super T, Throwable, ? extends U> fn);
- public CompletableFuture thenCompose(Function super T, ? extends CompletionStage> fn);
thenApply,依赖上一次任务执行的结果,参数中的Function super T,? extends U>,T代表上一次任务返回值的类型,U代表当前任务返回值的类型,当上一个任务没有出现异常时,thenApply才会被调用
thenRun,不需要知道上一个任务的返回结果,只是在上一个任务执行完成之后开始执行Runnable
thenAccept,依赖上一次任务的执行结果,因为入参是Consumer,所以不返回任何值。
handle和thenApply相似,不过当上一个任务出现异常时,能够执行handle,却不会去执行thenApply
thenCompose,传入一次任务执行的结果,返回一个新的CompleteableFuture对象
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case4 {
- public static void main(String[] args) {
- CompletableFuture.supplyAsync(() -> 2)
- .thenApply(num -> num * 3)
- .thenAccept(System.out::print);
- }
- }
很显然,输出为6
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- import java.util.function.BiFunction;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case4 {
- public static void main(String[] args) {
- CompletableFuture.supplyAsync(() -> 2)
- .thenApply(num -> num / 0)
- .thenApply(result -> result * 3)
- .handle((integer, throwable) -> {
- if (throwable == null) {
- return integer;
- } else {
- throwable.printStackTrace();
- return -1;
- }
- }).thenAccept(System.out::print);
- }
- }
最终会输出-1
- public CompletableFuture
thenCombine(CompletionStage extends U> other, - Function super T,? super U,? extends V> fn);
- public CompletableFuture
thenAcceptBoth(CompletionStage extends U> other, - Consumer super T, ? super U> action);
- public CompletableFuture
runAfterBoth(CompletionStage> other,Runnable action); - public static CompletableFuture
allOf(CompletableFuture>... cfs);
thenCombine,合并两个任务,两个任务可以同时执行,都执行成功后,执行最后的BiFunction操作。其中T代表第一个任务的执行结果类型,U代表第二个任务的执行结果类型,V代表合并的结果类型
thenAcceptBoth,和thenCombine特性用法都极其相似,唯一的区别在于thenAcceptBoth进行一个消费,没有返回值
runAfterBoth,两个任务都执行完成后,但不关心他们的返回结构,然后去执行一个Runnable。
allOf,当所有的任务都执行完成后,返回一个CompletableFuture
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case5 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
cf1 = CompletableFuture.supplyAsync(() -> { - System.out.println("任务1开始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务1结束");
- return 2;
- });
- CompletableFuture
cf2 = CompletableFuture.supplyAsync(() -> { - System.out.println("任务2开始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务2结束");
- return 3;
- });
- CompletableFuture
completableFuture = cf1.thenCombine(cf2, (result1, result2) -> result1 * result2); - System.out.println("计算结果:" + completableFuture.get());
- }
- }
输出:
可以看到两个任务确实是同时执行的
当然,熟练了之后,直接使用链式操作,代码如下:
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case6 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - System.out.println("任务1开始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务1结束");
- return 2;
- }).thenCombine(CompletableFuture.supplyAsync(() -> {
- System.out.println("任务2开始");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务2结束");
- return 3;
- }), (result1, result2) -> result1 * result2);
- System.out.println("计算结果:" + completableFuture.get());
- }
- }
- public CompletableFuture applyToEither(CompletionStage extends T> other, Function super T, U> fn);
- public CompletableFuture
acceptEither(CompletionStage extends T> other, Consumer super T> action); - public CompletableFuture
runAfterEither(CompletionStage> other,Runnable action); - public static CompletableFuture
applyToEither,最新执行完任务,将其结果执行Function操作,其中T是最先执行完的任务结果类型,U是最后输出的类型
acceptEither,最新执行完的任务,将其结果执行消费操作
runAfterEither,任意一个任务执行完成之后,执行Runnable操作
anyOf,多个任务中,返回最先执行完成的CompletableFuture
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case7 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { - System.out.println("任务1开始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务1结束");
- return 2;
- }).acceptEither(CompletableFuture.supplyAsync(() -> {
- System.out.println("任务2开始");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务2结束");
- return 3;
- }), result -> System.out.println(result));
- //等待CompletableFuture返回,防止主线程退出
- completableFuture.join();
- }
- }
输出:
可以看得到,任务2结束后,直接不再执行任务1的剩余代码
- package com.qcy.testCompleteableFuture;
- import java.util.concurrent.CompletableFuture;
- /**
- * @author qcy
- * @create 2020/09/07 17:40:44
- */
- public class Case8 {
- public static void main(String[] args) throws Exception {
- CompletableFuture
cf1 = CompletableFuture.supplyAsync(() -> { - System.out.println("任务1开始");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务1结束");
- return 2;
- });
- CompletableFuture
cf2 = CompletableFuture.supplyAsync(() -> { - System.out.println("任务2开始");
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务2结束");
- return 3;
- });
- CompletableFuture
cf3 = CompletableFuture.supplyAsync(() -> { - System.out.println("任务3开始");
- try {
- Thread.sleep(4000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("任务3结束");
- return 4;
- });
- CompletableFuture
- System.out.println(firstCf.get());
- }
- }
输出:
文章标题:什么,你还不会用CompletableFuture?
本文来源:http://www.mswzjz.com/qtweb/news12/169612.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联