Java并发 CompletableFuture异步编程的实现
前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化。
//以下两个方法都是耗时操作 doBizA(); doBizB(); //创建两个子线程去执行就可以了,两个操作已经被异步化了。 newThread(()->doBizA()) .start(); newThread(()->doBizB()) .start();
异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。Java在1.8版本提供了CompletableFuture来支持异步编程。
CompletableFuture的核心优势
为了领略CompletableFuture异步编程的优势,这里我们用CompletableFuture重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了3个任务:任务1负责洗水壶、烧开水,任务2负责洗茶壶、洗茶杯和拿茶叶,任务3负责泡茶。其中任务3要等待任务1和任务2都完成后才能开始。这个分工如下图所示。
//任务1:洗水壶->烧开水 CompletableFuturef1= CompletableFuture.runAsync(()->{ System.out.println("T1:洗水壶..."); sleep(1,TimeUnit.SECONDS); System.out.println("T1:烧开水..."); sleep(15,TimeUnit.SECONDS); }); //任务2:洗茶壶->洗茶杯->拿茶叶 CompletableFuture f2= CompletableFuture.supplyAsync(()->{ System.out.println("T2:洗茶壶..."); sleep(1,TimeUnit.SECONDS); System.out.println("T2:洗茶杯..."); sleep(2,TimeUnit.SECONDS); System.out.println("T2:拿茶叶..."); sleep(1,TimeUnit.SECONDS); return"龙井"; }); //任务3:任务1和任务2完成后执行:泡茶 CompletableFuture f3= f1.thenCombine(f2,(__,tf)->{ System.out.println("T1:拿到茶叶:"+tf); System.out.println("T1:泡茶..."); return"上茶:"+tf; }); //等待任务3执行结果 System.out.println(f3.join()); voidsleep(intt,TimeUnitu){ try{ u.sleep(t); }catch(InterruptedExceptione){} } //一次执行结果: T1:洗水壶... T2:洗茶壶... T1:烧开水... T2:洗茶杯... T2:拿茶叶... T1:拿到茶叶:龙井 T1:泡茶... 上茶:龙井
从整体上来看,我们会发现
- 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
- 语义更清晰,例如f3=f1.thenCombine(f2,()->{})能够清晰地表述“任务3要等待任务1和任务2都完成后才能开始”;
- 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
领略CompletableFuture异步编程的优势之后,下面我们详细介绍CompletableFuture的使用。
创建CompletableFuture对象
创建CompletableFuture对象主要靠下面代码中展示的这4个静态方法,我们先看前两个。在烧水泡茶的例子中,我们已经使用了runAsync(Runnablerunnable)和supplyAsync(Suppliersupplier),它们之间的区别是:Runnable接口的run()方法没有返回值,而Supplier接口的get()方法是有返回值的。
前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。
默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数(也可以通过JVMoption:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
//使用默认线程池 staticCompletableFuturerunAsync(Runnablerunnable) staticCompletableFuture supplyAsync(Suppliersupplier) //可以指定线程池 staticCompletableFuture runAsync(Runnablerunnable,Executorexecutor) staticCompletableFuture supplyAsync(Suppliersupplier,Executorexecutor)
创建完CompletableFuture对象之后,会自动地异步执行runnable.run()方法或者supplier.get()方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为CompletableFuture类实现了Future接口,所以这两个问题你都可以通过Future接口来解决。另外,CompletableFuture类还实现了CompletionStage接口,这个接口内容实在是太丰富了,在1.8版本里有40个方法,这些方法我们该如何理解呢?
理解CompletionStage接口
可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。这样说可能有点抽象,这里还举前面烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。
CompletionStage接口可以清晰地描述任务之间的这种时序关系,例如前面提到的
f3=f1.thenCombine(f2,()->{})描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系是一种AND聚合关系,这里的AND指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有AND聚合关系,那就一定还有OR聚合关系,所谓OR指的是依赖的任务只要有一个完成就可以执行当前任务。
最后就是异常,CompletionStage接口也可以方便地描述异常处理。
下面我们就来一一介绍,CompletionStage接口如何描述串行关系、AND聚合关系、OR聚合关系以及异常处理。
1.描述串行关系
CompletionStage接口里面描述串行关系,主要是thenApply、thenAccept、thenRun和thenCompose这四个系列的接口。
thenApply系列函数里参数fn的类型是接口Function
而thenAccept系列方法里参数consumer的类型是接口Consumer
thenRun系列方法里action的参数是Runnable,所以action既不能接收参数也不支持返回值,所以thenRun系列方法返回的也是CompletionStage
这些方法里面Async代表的是异步执行fn、consumer或者action。其中,需要你注意的是thenCompose系列方法,这个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。
CompletionStagethenApply(fn); CompletionStage thenApplyAsync(fn); CompletionStage thenAccept(consumer); CompletionStage thenAcceptAsync(consumer); CompletionStage thenRun(action); CompletionStage thenRunAsync(action); CompletionStage thenCompose(fn); CompletionStage thenComposeAsync(fn);
通过下面的示例代码,你可以看一下thenApply()方法是如何使用的。首先通过supplyAsync()启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。
CompletableFuturef0= CompletableFuture.supplyAsync( ()->"HelloWorld")//① .thenApply(s->s+"QQ")//② .thenApply(String::toUpperCase);//③ System.out.println(f0.join()); //输出结果 HELLOWORLDQQ
2.描述AND汇聚关系
CompletionStage接口里面描述AND汇聚关系,主要是thenCombine、thenAcceptBoth和runAfterBoth系列的接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。
CompletionStagethenCombine(other,fn); CompletionStage thenCombineAsync(other,fn); CompletionStage thenAcceptBoth(other,consumer); CompletionStage thenAcceptBothAsync(other,consumer); CompletionStage runAfterBoth(other,action); CompletionStage runAfterBothAsync(other,action);
3.描述OR汇聚关系
CompletionStage接口里面描述OR汇聚关系,主要是applyToEither、acceptEither和runAfterEither系列的接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。
CompletionStageapplyToEither(other,fn); CompletionStageapplyToEitherAsync(other,fn); CompletionStageacceptEither(other,consumer); CompletionStageacceptEitherAsync(other,consumer); CompletionStagerunAfterEither(other,action); CompletionStagerunAfterEitherAsync(other,action);
CompletableFuturef1= CompletableFuture.supplyAsync(()->{ intt=getRandom(5,10); sleep(t,TimeUnit.SECONDS); returnString.valueOf(t); }); CompletableFuture f2= CompletableFuture.supplyAsync(()->{ intt=getRandom(5,10); sleep(t,TimeUnit.SECONDS); returnString.valueOf(t); }); CompletableFuture f3= f1.applyToEither(f2,s->s); System.out.println(f3.join());
4.异常处理
虽然上面我们提到的fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行
CompletableFuturef0=CompletableFuture. .supplyAsync(()->(7/0)) .thenApply(r->r*10); System.out.println(f0.join());
CompletionStage接口给我们提供的方案非常简单,比try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。
CompletionStageexceptionally(fn); CompletionStagewhenComplete(consumer); CompletionStage whenCompleteAsync(consumer); CompletionStage handle(fn); CompletionStage handleAsync(fn);
下面的示例代码展示了如何使用exceptionally()方法来处理异常,exceptionally()的使用非常类似于try{}catch{}中的catch{},但是由于支持链式编程方式,所以相对更简单。
whenComplete()和handle()系列方法就类似于try{}finally{}中的finally{},无论是否发生异常都会执行whenComplete()中的回调函数consumer和handle()中的回调函数fn。
whenComplete()和handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。
CompletableFuturef0=CompletableFuture .supplyAsync(()->7/0)) .thenApply(r->r*10) .exceptionally(e->0); System.out.println(f0.join());
总结
不过最近几年,伴随着ReactiveX的发展(Java语言的实现版本是RxJava),回调地狱已经被完美解决了,Java语言也开始官方支持异步编程:在1.8版本提供了CompletableFuture,在Java9版本则提供了更加完备的FlowAPI,异步编程目前已经完全工业化。
CompletableFuture已经能够满足简单的异步编程需求,如果你对异步编程感兴趣,可以重点关注RxJava这个项目,利用RxJava,即便在Java1.6版本也能享受异步编程的乐趣。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。