CompletableFuture 使用案例详解
CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
1、 runAsync
和 supplyAsync
方法
一、创建异步任务
- supplyAsync
supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法
// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
常用AP
thnCombine
-合并两个线程任务的结果,并进一步处理。applyToEither
-两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。acceptEither
-两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。runAfterEither
-两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。runAfterBoth-
两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。anyOf-anyOf
方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。allOf-allOf
方法用来实现多 CompletableFuture 的同时返回。
先准备一个模拟线程的工具类
/**
* <p>
* Description: m
* </p>
*
* @author songzixian
*/
public class ThreadTool {
/**
* description 模拟线程办法
*
* @param: [millis]
* @return
* @Date 2022/8/20
*/
public static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* description 输出当前线程相关信息
*
* @param: tag
* @return
* @Date 2022/8/20
*/
public static void printTimeAndThread(String tag) {
String result = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
}
先来个简单的demo体验
小A进入咖啡馆,点了一杯咖啡,服务员开始制作咖啡,在制作的过程小A也不闲,同时刷了一会抖音后,咖啡制作完成,于是小A走咖啡厅去公司上班
/**
* @author songzixian
*/
public class Demo2 {
public static void main(String[] args) {
ThreadTool.printTimeAndThread("小A进入咖啡厅");
ThreadTool.printTimeAndThread("小A在咖啡厅和服务员点了一杯咖啡");
// 创建一个带返回值的线程
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("服务员开始制作咖啡");
ThreadTool.sleepMillis(100);
return "咖啡制作好了";
// money表示上面的 咖啡制作好了
}).thenApply(result -> {
ThreadTool.printTimeAndThread(result + "->服务员开始打包");
ThreadTool.sleepMillis(100);
return "咖啡打包好了";
});
ThreadTool.printTimeAndThread("小A正在刷抖音");
ThreadTool.printTimeAndThread(String.format("小A接到服务员%s,接着去公司上班", completableFuture.join()));
}
控制台打印
办法A执行完再开始执行B
第二个例子:办法A执行完再开始执行B
/**
* @author songzixian
*/
public class Demo2 {
public static void main(String[] args) {
long time = System.currentTimeMillis();
ThreadTool.printTimeAndThread("小A进入咖啡店");
ThreadTool.printTimeAndThread("小A点了2杯咖啡");
//CompletableFuture<String>返回结果代表String类型
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("服务员A开始制作第1杯咖啡");
ThreadTool.sleepMillis(200);
//返回执行结果
return "服务员制作好了第一杯咖啡";
//传入dist代表返货到上面的返回结果 ->"服务员制作好咖啡机了"
}).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("服务员A开始制作第2杯咖啡");
ThreadTool.sleepMillis(100);
return dish + "服务员制作好了第二杯咖啡";
}));
ThreadTool.printTimeAndThread("小A正在刷抖音");
//completableFuture示例的jion方法,这个join方法的返回类型就是上面的CompletableFuture的泛型
//join方法会等待任务结束,然后返回任务的结果
ThreadTool.printTimeAndThread(String.format("%s ,小A开始喝咖啡", completableFuture.join()));
long ms = System.currentTimeMillis() - time;
System.out.println("办法总消耗的时间:" + ms + " ms");
}
}
控制台打印
当方法A与B同时执行完,再执行C
第三个例子,当方法A与B同时执行完,再执行C
/**
* @author songzixian
*/
public class Demo3 {
public static void main(String[] args) {
long time = System.currentTimeMillis();
ThreadTool.printTimeAndThread("小A进入咖啡店");
ThreadTool.printTimeAndThread("小A点了2杯咖啡");
//CompletableFuture<String>返回结果代表String类型
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// 业务一
ThreadTool.printTimeAndThread("服务员A开始制作第1杯咖啡");
ThreadTool.sleepMillis(200);
//返回执行结果
return "服务员制作好了第一杯咖啡";
//传入dist代表返货到上面的返回结果 ->"服务员制作好咖啡机了"
//thenCombine作用表示第一个任务和第二个个任务同时执行
}).thenCombine(CompletableFuture.supplyAsync(() -> {
// 业务二
ThreadTool.printTimeAndThread("服务员B开始制作第2杯咖啡");
ThreadTool.sleepMillis(100);
return "服务员制作好了第二杯咖啡";
// dist是第一个业务执行结果,rece是第二个业务执行结果
}), (dish, rece) -> {
//当第一个业务和第二个业务同时执行完毕后,第三个业务开始执行
ThreadTool.printTimeAndThread("服务员C开始开始打包第一杯咖啡喝第二杯咖啡");
ThreadTool.sleepMillis(100);
return String.format("%s +%s 好了", dish, rece);
});
ThreadTool.printTimeAndThread("小A正在刷抖音");
//completableFuture示例的jion方法,这个join方法的返回类型就是上面的CompletableFuture的泛型
//join方法会等待任务结束,然后返回任务的结果
ThreadTool.printTimeAndThread(String.format("%s ,小A开始拿到咖啡喝", completableFuture.join()));
long ms = System.currentTimeMillis() - time;
System.out.println("办法总消耗的时间:" + ms + " ms");
}
}
控制台打印
当业务A与业务B同时执行,哪个优先执行完,则返回结果
第四个案例:当业务A与业务B同时执行,哪个优先执行完,则返回结果
/**
* <p>
* Description: 小A等公交车,200路公交和500公交可以回家,哪个公交优先到坐哪个,如果出现车坏, 则叫出租车回家
* </p>
*
*/
public class Demo3 {
public static void main(String[] args) {
ThreadTool.printTimeAndThread("小A在公交站等车");
ThreadTool.printTimeAndThread("小A 回家可以坐200路和205路的公交车回家");
// 创建一个带返回值的线程
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("200公交正在开来");
int a=10/0;
ThreadTool.sleepMillis(100);
return "等到200路公交";
//applyToEither 表示,上个任务和下个任务同时执行,哪个优先执行完,就把结果返回给CompletableFuture(只有一个结果)
}).applyToEither(CompletableFuture.supplyAsync(() -> {
ThreadTool.printTimeAndThread("205公交正在开来");
ThreadTool.sleepMillis(100);
return "等到250路公交";
}),firstComBus -> {
if (firstComBus.startsWith("200公交车坏了")){
ThreadTool.printTimeAndThread("小A打网约车");
}
return firstComBus;
}).exceptionally(e ->{
//如果上面代码出现异常,则执行这里
ThreadTool.printTimeAndThread(e.getMessage());
ThreadTool.printTimeAndThread("小A打网约车");
return "网约车到达";
});
ThreadTool.printTimeAndThread(String.format("小A坐%s公交车回家", completableFuture.join()));
}
}
控制台打印