0%

CompletableFuture使用

  • 实现了异步非阻塞的效果,任务执行完毕会主动通知,不需要调用方主动获取。

  • 使用结构存储后续任务

  • 无法保证后续任务执行顺序,任务是异步执行的,后续任务在放入之前,前置任务如果执行完则不需要放入中,直接执行;否则弹栈执行

  • 在异步情况下,如果不指定线程池,异步任会交给ForkJoinPool执行

  • ForkJoinPool内部是守护线程,守护线程在主线程结束后就不干活了

一、生成异步任务

1.1、supplyAsync

1
2
3
4
5
6
CompletableFuture<Integer> resultCompletableFuture = CompletableFuture
//开启异步CompletableFuture任务
.supplyAsync(() -> {
System.out.println("CompletableFuture线程:" + Thread.currentThread().getName());
return 111;
});

1.2、runAsync

1
2
3
4
5
6
7
CompletableFuture<Void> voidCompletableFuture = CompletableFuture
.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("异步执行run");
}
});

二、前一个任务执行完,再执行后续操作

2.1、thenApply

获取上一个CompletableFuture任务的处理结果,处理后返回新结果

t异步使用thenAcceptAsync,有时前一个任务执行完线程空闲ForkJoinPool直接拿来复用,控制台打印的话体现为同一个线程,可以通过制定线程池让结果看起来更明显

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Integer result = CompletableFuture
.supplyAsync(() -> {
System.out.println("supplyAsync线程:" + Thread.currentThread().getName());
return 111;
})
//上一个CompletableFuture任务执行完再执行
.thenApply((preResult -> {
System.out.println("thenApply线程:" + Thread.currentThread().getName());
return preResult + 222;
}))
.get();
System.out.println("result:" + result);
//supplyAsync线程:ForkJoinPool.commonPool-worker-9
//thenApply线程:ForkJoinPool.commonPool-worker-9
//result:333

2.2、thenAccept

接受上一个任务结果进行处理,没有返回结果;异步使用thenAcceptAsync

1
2
3
4
CompletableFuture<Void> voidCompletableFuture = CompletableFuture
.supplyAsync(() -> 111)
//接受上一个任务结果进行处理,没有返回结果;异步使用thenAcceptAsync
.thenAccept(result -> System.out.println(result + 222));

2.3、thenRun

没有入参和反参,当之前completableFuture执行完后,运行runnable任务;异步使用thenRunAsync

1
2
3
4
5
6
7
8
9
CompletableFuture<Void> voidCompletableFuture = CompletableFuture
.supplyAsync(() -> {
System.out.println("supplyAsync" + Thread.currentThread().getName());//supplyAsyncForkJoinPool.commonPool-worker-9
return 111;
})
//没有入参和反参,当之前completableFuture执行完后,运行runnable任务;异步使用thenRunAsync
.thenRun(() -> {
System.out.println("thenRun:" + Thread.currentThread().getName());//thenRun:main
});

2.4、thenCombine

与thenApply类似,返回类型为CompletableFuture;异步使用thenComposeAsync

1
2
3
4
5
6
7
8
9
10
11
Integer join = CompletableFuture
.supplyAsync(() -> 111)
//与thenApply类似,前一个任务处理完的结果作为入参,返回一个CompletableFuture任务类型
.thenCompose(result ->
{
System.out.println("thenCompose:" + result);//thenCompose:111

return CompletableFuture.supplyAsync(() -> result + 222);
}
).join();
System.out.println(join);//333

三、两个任务执行完,再执行后续操作

3.1、thenCombine

两个任务一起执行,获取两个任务执行结果,处理后返回;异步使用thenCombineAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Integer result = CompletableFuture
.supplyAsync(() -> 111)
//两个任务一起执行,获取执行结果在一起处理
.thenCombine(
CompletableFuture.supplyAsync(() -> 222),
//BiFunction,两个CompletableFuture任务结果作为入参,处理返回
(preResult1, preResult2) ->
{
System.out.println("thenCombine BiFunction,thread:" + Thread.currentThread().getName() + ",preResult1:" + preResult1 + ",preResult2:" + preResult2);
//thenCombine BiFunction,preResult1:111,preResult2:222
return preResult1 + preResult2 + 333;
}
)
.join();
System.out.println("thenCombine result:" + result);
//thenCombine result:666

3.2、thenAcceptBoth

两个任务一起执行,获取两个任务执行结果处理,无返回结果;异步使用thenAcceptBothAsync

1
2
3
4
5
6
7
8
9
Void join = CompletableFuture.supplyAsync(() -> 111)
.thenAcceptBoth(
CompletableFuture.supplyAsync(() -> 222),
//BiConsumer,把两个CompletableFuture任务的结果作为入参,没有返回值
(preResult1, preResult2) -> {
System.out.println("thenCombine BiConsumer,thread:" + Thread.currentThread().getName() + ",preResult1:" + preResult1 + ",preResult2:" + preResult2);
//thenCombine BiConsumer,thread:main,preResult1:111,preResult2:222
}
).join();

3,3、runAfterBoth

两个任务一起执行完调用Runnable任务;异步使用runAfterBothAsync

1
2
3
4
5
6
7
8
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> 111)
//等待两个CompletableFuture执行完,运行runnable任务
.runAfterBoth(CompletableFuture.supplyAsync(() -> 222), new Runnable() {
@Override
public void run() {
System.out.println("run:" + Thread.currentThread().getName());
}
});

四、两个任务任一执行完,再执行后续操作

4.1、applyToEither

两个CompletableFuture一起执行,当有一个任务处理完,则获取结果值传入Function执行后续处理,返回处理结果;异步使用applyToEitherAsync

1
2
3
4
5
6
7
8
Integer join = CompletableFuture.supplyAsync(() -> 111)
.applyToEither(
CompletableFuture.supplyAsync(() -> 222),
(preResult) -> {
System.out.println("applyToEither preResult:" + preResult); //applyToEither preResult:111
return 333;
}
).join();

4.2、acceptEither

两个CompletableFuture一起执行,当有一个任务处理完,则获取结果值执行后续处理,无返回;异步使用acceptEitherAsync

1
2
3
4
5
6
7
CompletableFuture<Void> voidCompletableFuture = CompletableFuture
.supplyAsync(() -> 111)
.acceptEither(
CompletableFuture.supplyAsync(() -> 222),
(preResult) ->
System.out.println("acceptEither:" + preResult)
);

4.3、runAfterEither

两个CompletableFuture一起执行,当有任一任务处理完,则执行Runnable,无入参无返回

1
2
3
4
5
6
7
8
9
10
CompletableFuture<Void> voidCompletableFuture = CompletableFuture
.supplyAsync(() -> 111)
.runAfterEither(
CompletableFuture.supplyAsync(() -> 222),
new Runnable() {
@Override
public void run() {
System.out.println("run:" + Thread.currentThread().getName());
}
});

五、异常处理

exceptionally

1
2
3
4
5
6
7
8
9
10
11
12
Integer join = CompletableFuture
.supplyAsync(() -> {
System.out.println(1 / 0);
return 111;
})
//捕获前面任务的异常信息进行处理,返回异常默认值,不提供异步操作
.exceptionally(ex -> {
System.out.println(ex);//java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
return 222;
})
.join();
System.out.println("join:" + join);//join:222

六、获取结果

6.1、join

join抛出的是unchecked Exception 不会要求开发者try…catch

6.2、get

get需要捕获处理异常

七、结果处理

7.1、whenComplete

任务结果和异常处理,不影响最终结果的返回

1
2
3
4
5
6
7
Integer result = CompletableFuture
.supplyAsync(() -> 111)
//获取上一个任务已完成的结果(对此值操作不影响最终结果的返回)和异常
.whenComplete((preResult, ex) -> {
System.out.println("preResult:" + preResult + ",ex:" + ex);//preResult:111,ex:null
}).join();
System.out.println("result:" + result);//result:111

7.2、handle

任务结果和异常处理,带结果返回

1
2
3
4
5
6
7
Integer result = CompletableFuture
.supplyAsync(() -> 111)
.handle((preResult, ex) -> {
System.out.println("preResult:" + preResult + ",ex:" + ex);//preResult:111,ex:null
return preResult + 222;
}).join();
System.out.println("result:" + result);//result:333

八、多个任务执行

8.1、allOf

1
2
3
4
5
6
7
CompletableFuture<Void> allOf = CompletableFuture
.allOf(
CompletableFuture.supplyAsync(() -> 111),
CompletableFuture.supplyAsync(() -> 222)
);

System.out.println("allOf:" + allOf.join());

8,2、anyOf

等待任一任务完成获取结果

1
2
3
4
5
6
CompletableFuture<Object> anyOf = CompletableFuture
.anyOf(
CompletableFuture.supplyAsync(() -> 111),
CompletableFuture.supplyAsync(() -> 222)
);
System.out.println("anyOf:" + anyOf.join());//anyOf:111