Project Reactor
响应式概念
响应式编程通常以面向对象的语言呈现,作为观察者设计模式的扩展。
通过编写异步、非阻塞的代码寻求更高的处理效率。
Callback异步处理,通过Future获取结果可能会出现回调地狱,CompletableFuture可以将多个Future进行编排,但使用上并不是那么容易,且不支持惰性计算、多值和处理的灵活性。
响应式库(例如 Reactor)旨在解决 JVM 上“经典”异步方法的这些缺点:
- 可组合性和可读性
- 数据作为使用丰富的运算符词汇操作的流
- 在订阅之前什么都不会发生
- 背压或消费者向生产者发出排放率过高信号的能力
- 与并发无关的高级但高价值的抽象
reactort 概念
- 对响应式流规范的一种实现
- Spring WebFlux默认的响应式框架
- 完全异步非阻塞,对背压支持
- 提供两个异步序列API:Flux[N]、Mono[0|1]
- 提供对响应式流的操作
依赖
1 | <dependency> |
接口定义
Publisher - 发布者
1
2
3public interface Publisher<T>{
public void subscribe(Subscriber<? super T> s);
}- 发送
onNex
t信号次数不得超过订阅者请求的元素个数 - 如果进程失败,需要调用
onError
告知订阅者 Publisher.subscribe()
内部需调用onSubscribe(Subscriber s)
- 允许同时存在多个订阅者
- 发送
Subscriber - 订阅者
1
2
3
4
5
6public interface Subscriber<T>{
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}- 需要调用
Subscriptions.request(long n)
来向发布者请求数据,n指请求多少数据量 onComplete
或onError
方法,终止方法- 只能存在一个活跃的订阅
- 调用
Subscription.cancel()
之后仍有onNext
信号发送
- 需要调用
Subscription - 订阅关系
1
2
3
4public interface Subscription{
public void request(long n);
public void cancel();
}- 需要在当前订阅者的上下文中被调用
Subscription.request(long n)
最多请求2^63-1个元素
Processor - 处理器,reactor3.5后废弃
1
2
3public interface Processor<T,R> extends Subscriber<T>,Publisher<R>{
}
使用
创建
Mono
Mono最多可以指定一个元素
just - 指定一个元素
1
2
3//指定元素,只能指定一个元素
Mono<String> just = Mono.just("hello");
just.subscribe(System.out::println);//hellojustOrEmpty - 指定可能为空的元素
1
2
3
4
5
6
7
8
9//指定可能为空的元素
Mono<Object> objectMono = Mono.justOrEmpty(null);
objectMono.subscribe(System.out::println);//无输出
Mono<Object> objectMono = Mono.justOrEmpty(Optional.empty());
objectMono.subscribe(System.out::println);//无输出
Mono<Object> objectMono = Mono.justOrEmpty(123);
objectMono.subscribe(System.out::println);//123fromCallable - 异步调用创建,并未试出异步效果?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public static String request() throws IOException {
URL url = new URL("http://www.baidu.com");
URLConnection urlConnection = url.openConnection();
urlConnection.connect();
InputStream inputStream = urlConnection.getInputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
StringBuilder sb = new StringBuilder();
String temp = null;
while ((temp = bufferedReader.readLine()) != null) {
sb.append(temp).append("\r\n");
}
System.out.println(Thread.currentThread().getName());//打印出来还是main线程?
return sb.toString();
}
public static void main(String[] args) throws IOException {
// System.out.println(request());
Mono.just(request()).subscribe(System.out::println);
Mono.fromCallable(() -> request()).subscribe(System.out::println);
Mono.fromCallable(AsyncMonoTest::request).subscribe(System.out::println);
Mono.fromCallable(AsyncMonoTest::request)
.subscribe(
data -> {
System.out.println(Thread.currentThread().getName());
System.out.println(data);
},
ex -> System.out.println("出错:" + ex),
() -> System.out.println("请求完成")
);
}
Flux
Flux可以指定多个元素创建响应式流
just - 列举创建
1
2Flux<String> just = Flux.just("hello", "every body");
just.subscribe(System.out::println);hello
every bodyfromArray - 数组创建
1
2Flux<String> stringFlux = Flux.fromArray(new String[]{"hello", "world"});
stringFlux.subscribe(System.out::println);hello
every bodyfromIterable - iterable接口创建
1
2Flux<Integer> iterableFlux = Flux.fromIterable(Arrays.asList(1, 2, 3, 4));
iterableFlux.subscribe(System.out::println);1
2
3
4range - 递增序列创建
1
2
3//第一个参数(即1000)为递增序列的初始值;第二个参数为递增的总数据量,包含初始值
Flux<Integer> rangeFlux = Flux.range(1000, 5);
rangeFlux.subscribe(System.out::println);1000
1001
1002
1003
1004from - 基于publisher创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33Flux.from(new Publisher<String>() {
/**
* 请求Publisher开始流数据
* 这是一个“工厂方法”,可以多次调用,每次启动一个新的Subscription。
* 每个订阅(Subscription)只适用于单个订阅服务器(Subscriber)。
* 订阅服务器(Subscriber)只能向单个发布服务器(Publisher)订阅一次。
* 如果发布者(Publisher)拒绝订阅(Subscriber)尝试或以其他方式失败,它将通过Subscriber.onError发出错误信号。
* @param subscriber
*/
public void subscribe(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext("hello - " + i);
}
subscriber.onComplete();
}
}).subscribe(
data -> System.out.println(data),
ex -> System.out.println("出错:" + ex),
() -> System.out.println("请求完成")
);
//lambda简化
Flux.from((Publisher<String>) subscriber -> {
for (int i = 0; i < 5; i++) {
subscriber.onNext("hello - " + i);
}
subscriber.onComplete();
}).subscribe(
data -> System.out.println(data),
ex -> System.out.println("出错:" + ex),
() -> System.out.println("请求完成")
);hello - 0
hello - 1
hello - 2
hello - 3
hello - 4
请求完成interval - 间隔创建
指定时间间隔内,从0开始递增产生值
public static Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
第一个参数:延迟开始时间、第二个参数:间隔时间周期、第三个参数:调度线程池
1 | Flux.interval(Duration.ZERO, Duration.ofMillis(1000)) |
编程方式创建
generate
逐一对信号进行加工输出创建Flux,通过BiFunction函数对状态加工输出,发送给下游订阅者
1 | Flux<String> flux = Flux.generate( |
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
state:11
push
适用于单线程生产者,默认背压策略:OverflowStrategy.BUFFER
1 | public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter) { |
1 | Flux.push(new Consumer<FluxSink<Integer>>() { |
create
使用上与push类似,适用于多线程生产者,默认背压策略:OverflowStrategy.BUFFER
1 | public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) { |
多线程使用上如何体现?
1 | Flux.create(new Consumer<FluxSink<Integer>>() { |
背压策略
1 | enum OverflowStrategy { |
1 | CountDownLatch latch = new CountDownLatch(1); |
1
2
3
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue…)
main结束
订阅
当订阅时,Subscriber会创建一个对象链,沿着链向上寻找第一个发布者。
基于 Lambda 订阅
1 | //订阅并触发序列 |
所有这些subscribe()都有一个Disposable返回类型,可以通过调用其dispose()方法取消订阅。
对于
Flux
orMono
,取消是一个信号,表明源应该停止生成元素。但是,它不能保证是立即的:某些源可能会非常快地生成元素,以至于它们甚至可以在收到取消指令之前完成
基于BaseSubscriber订阅
1 | Flux<Integer> flux = Flux.range(1, 4); |
Subscribed
1
2
3
4
using - 资源统一处理
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
resourceSupplier:在订阅时被调用,以生成资源的可调用对象
sourceSupplier:从resourceSupplier获取Publisher,数据源
resourceCleanup:在完成时,调用资源对象的清理回调
1 | static class Connection implements AutoCloseable { |
创建Connection对象
关闭Connection连接
onError接收到的异常信息:java.lang.RuntimeException: 通信异常
Schedulers
调度线程池
- Schedulers.newSingle():单个可重复使用的线程。
- Schedulers.elastic(): 无界弹性线程池,有隐藏背压问题并导致过多线程的倾向,3.5.0开始弃用。
- Schedulers.boundedElastic():有界弹性线程池 ,会根据需要创建新的工作池并重用闲置的工作池;闲置时间过长(默认为 60 秒)的工作池也会被处理掉。与elastic()不同,它对其可以创建的支持线程数有上限(默认为 CPU 核心数 x 10)。达到上限后提交的最多 100 000 个任务将被排入队列,并在线程可用时重新安排(延迟调度时,延迟在线程可用时开始)。
- Schedulers.parallel() :为并行工作调整的固定工作池,创建与CPU 内核一样多的worker。
- Schedulers.newParallel() :指定线程数,线程工厂的自定义线程池。可以使用newXXX 方法自定义创建各种调度程序类型的新实例。
publishOn
影响调用publishOn()之后方法的执行上下文,任务被提交给Schedulers 的线程执行
1 | Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); |
main
main
parallel-scheduler-1
parallel-scheduler-1 value11
parallel-scheduler-1
parallel-scheduler-1 value12
subscribeOn
从订阅开始整条链路的执行上下文都交给Schedulers 的线程执行
1 | Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); |
parallel-scheduler-1
parallel-scheduler-1
parallel-scheduler-1 value11
parallel-scheduler-1
parallel-scheduler-1
parallel-scheduler-1 value12
错误处理
doOnError:出错时触发
1 | Flux<Integer> range = Flux |
1000
1001
1002
java.lang.RuntimeException: 出错
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 出错
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 出错
Caused by: java.lang.RuntimeException: 出错
at com.yrl.reactor.ReactorDemo1.main(ReactorDemo1.java:18)
retry:重试
不指定重试次数则无限重试
1 | Flux<Integer> range = Flux |
1000
1001
1002
java.lang.RuntimeException: 出错
1000
1001
1002
java.lang.RuntimeException: 出错
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 出错
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 出错
Caused by: java.lang.RuntimeException: 出错
at com.yrl.reactor.ReactorDemo1.main(ReactorDemo1.java:18)Process finished with exit code 0
onErrorResume:出错使用备用流
根据错误返回对应的流
1 | Flux<Integer> range = Flux |
1000
1001
1002
java.lang.RuntimeException: 出错
onErrorResume - ex:java.lang.RuntimeException: 出错
11111Process finished with exit code 0
onErrorReturn - 存在错误则返回指定值
1 | Flux<Integer> range = Flux |
1000
1001
1002
java.lang.RuntimeException: 出错
11111
onErrorMap - 对错误进行转换
1 | Flux<Integer> range = Flux |
1000
1001
1002
java.lang.RuntimeException: 出错
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 出错
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 出错
Caused by: java.lang.Exception: 出错
at com.yrl.reactor.ReactorDemo1.lambda$main$1(ReactorDemo1.java:21)
at reactor.core.publisher.Flux.lambda$onErrorMap$27(Flux.java:6544)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2023)
at reactor.core.publisher.Operators.error(Operators.java:196)
at reactor.core.publisher.FluxError.subscribe(FluxError.java:43)
at reactor.core.publisher.Flux.subscribe(Flux.java:8095)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:8095)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8268)
at reactor.core.publisher.Flux.subscribe(Flux.java:8065)
at reactor.core.publisher.Flux.subscribe(Flux.java:7989)
at reactor.core.publisher.Flux.subscribe(Flux.java:7932)
at com.yrl.reactor.ReactorDemo1.main(ReactorDemo1.java:22)
defaultIfEmpty - 为空时使用默认值
1 | Flux.empty().defaultIfEmpty("hello").subscribe(System.out::println); |
hello
defer - 延迟执行
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)
延迟执行,有订阅者出现才会执行,且为每个订阅者生成所有数据
1 | public static String data() { |
doOnNext
public final Flux<T> doOnNext(Consumer<? super T> onNext)
通过该方法可以对 Flux 或 Mono 上的每个元素执行一些操作,回调Subscriber.onNext
1 | Flux.range(100,3) |
100
101
102
doOnComplete
完成订阅处理后回调doOnComplete方法
public final Flux<T> doOnComplete(Runnable onComplete)
1 | Flux<Integer> range = Flux |
1000
1001
1002
完成
doOnTerminate
无论什么原因终止都会被调用
log - 日志跟踪
观察所有活性流信号并使用Logger支持跟踪它们,默认使用info级别和java.util.logging(如果SLF4J可用则使用它)
1 | Flux<Integer> range = Flux |
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1000)
1000
[ INFO] (main) | onNext(1001)
1001
[ INFO] (main) | onNext(1002)
1002
[ INFO] (main) | onComplete()
map - 转换数据
使用Function函数转换入参数据,返回指定形式数据
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper)
1 | Flux.range(100,3) |
hello 100
hello 101
hello 102
扁平化数据
flatMap - 立即订阅,交错处理
同时订阅数据,数据交错处理
1 | Random random = new Random(); |
[1, 2, 3]
已经订阅
[a, b, c, d]
已经订阅
[7, 8, 9]
已经订阅
1
a
7
2
b
3
c
8
d
9
concatMap- 顺序订阅,顺序处理
1 | Random random = new Random(); |
[1, 2, 3]
已经订阅
[a, b, c, d]
[7, 8, 9]
1
2
3
已经订阅
a
b
c
d
已经订阅
7
8
9
flatMapSequential - 立即订阅,顺序处理
1 | Random random = new Random(); |
[1, 2, 3]
已经订阅
[a, b, c, d]
已经订阅
[7, 8, 9]
已经订阅
1
2
3
a
b
c
d
7
8
9
filter - 过滤
public final Flux<T> filter(Predicate<? super T> p)
1 | Flux.range(100,3) |
hello 102
index - 增加索引值
public final Flux<Tuple2<Long, T>> index()
为数据增加索引,构建二元数据 Tuple2<(index, value)>
1 | Flux.range(100, 3) |
Tuple2:[0,hello-100]
索引:0
值:hello-100Tuple2:[1,hello-101]
索引:1
值:hello-101Tuple2:[2,hello-102]
索引:2
值:hello-102
timestamp - 增加时间戳
public final Flux<Tuple2<Long, T>> timestamp()
为数据增加当前时间戳,构建二元数据 Tuple2<(timestamp , value)>
1 | Flux.just(100,101,102) |
Tuple2:[1669364259980,hello-100]
时间戳:1669364259980
值:hello-100Tuple2:[1669364259980,hello-101]
时间戳:1669364259980
值:hello-101Tuple2:[1669364259980,hello-102]
时间戳:1669364259980
值:hello-102
skip - 跳过
跳过指定数量的元素
public final Flux<T> skip(long skipped)
1 | Flux.just(100,101,102) |
102
then
忽略来自该Flux的元素,并将其完成信号转换为所新提供Mono的发射和完成信号,订阅者只处理Mono数据
1 | Flux.just(1,2,3,4,5,6) |
doOnNext:1
doOnNext:2
doOnNext:3
doOnNext:4
doOnNext:5
doOnNext:6
7
thenMany
让这个Flux完成后使用新的Flux,订阅者只处理新Flux数据
1 | Flux.just(1, 2, 3, 4, 5, 6) |
doOnNext:1
doOnNext:2
doOnNext:3
doOnNext:4
doOnNext:5
doOnNext:6
8
9
next - 只获取一个数据
只获取一个数据到一个新的Mono。如果为空,则发出一个空的Mono。
1 | Mono<Integer> next = Flux |
1000
take - 获取前N个数据
public final Flux<T> take(long n)
获取前N个数据后取消订阅剩余数据
1 | Flux.just(100,101,102) |
100
101
takeLast - 获取后N个数据
public final Flux<T> takeLast(int n)
1 | Flux.just(100,101,102) |
101
102
takeUntil - 传递值,直到条件满足则停止
public final Flux<T> takeUntil(Predicate<? super T> predicate)
1 | Flux.just(100,101,102) |
100
101
elementAt - 根据索引获取数据
public final Mono<T> elementAt(int index)
1 | Flux.just(100,101,102) |
102
takeUntilOther - 获取数据直到另一数据流数据到来,则停止该流处理
public final Flux<T> takeUntilOther(Publisher<?> other)
skipUntilOther(Publisher) - 跳过数据直到另一数据流数据到来,才开始该流处理
public final Flux<T> skipUntilOther(Publisher<?> other)
takeUntilOther与skipUntilOther结合的例子
1 | //创建一个Flux,它发出从0开始增长,并在全局计时器上按指定的时间间隔递增。第一个元素在等于周期的初始延迟之后发出 |
item 0
item 1
item 2
onNext: item 2
item 3
onNext: item 3
item 4
onNext: item 4
onCompleted
all - 全部符合为rue
public final Mono<Boolean> all(Predicate<? super T> predicate)
1 | Flux.just(2,4,6) |
true
any - 任一符合为rue
public final Mono<Boolean> all(Predicate<? super T> predicate)
1 | Flux.just(1,2,3,4,5,6) |
true
hasElement - 是否含有指定元素
操作符检查流中是否包含某个所需的元素。短路逻辑,在元素与值匹配时立即返回true。
1 | Flux.just(1,2,3,4,5,6) |
true
repeat - 重复操作
public final Flux<T> repeat()
无限订阅
public final Flux<T> repeat(long numRepeat)
再原有基础上再订阅numRepeat次
1 | Flux.just(100) |
100
100
100
去重
distinctUntilChanged - 局部去重
public final Flux<T> distinctUntilChanged()
1 | Flux.just(1,1,1,2,2,2,3,3,3,1,1,1,2,2,2) |
1
2
3
1
2
distinct - 全局去重
public final Flux<T> distinct()
1 | Flux.just(1,1,1,2,2,2,3,3,3,1,1,1,2,2,2) |
1
2
3
组合
reduce - 前一步结果与当前元素组合
public final <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator)
使用BiFunction会将前一步的结果(首次为初始值)与当前的元素组合在一起
1 | Flux.range(1,5) |
item1:0
item2:1item1:1
item2:2item1:3
item2:3item1:6
item2:4item1:10
item2:515
scan - 前一步结果与当前元素组合
与reduce类似,区别在于scan会将中间结果打印
public final <A> Flux<A> scan(A initial, BiFunction<A, ? super T, A> accumulator)
1 | Flux.range(1,5) |
0
num1:0
num2:1
1
num1:1
num2:2
3
num1:3
num2:3
6
num1:6
num2:4
10
num1:10
num2:5
15
concat - 组合多个流,流之间存在顺序
通过向下游转发接收的元素来连接所有数据源。当操作符连接两个流时,它首先消费并重新发送第一个流的所有元素,然后对第二个流执行相同的操作。
public static <T> Flux<T> concat(Publisher<? extends T>... sources)
1 | Flux.concat( |
订阅第一个流
10
11
12
13
14
订阅第二流
100
101
102
103
104
merge - 组合多个流,流之间无顺序
将来自上游序列的数据合并到一个下游序列中。与 concat 操作符不同,上游数据源是立即(同时)被订阅的。
public static <T> Flux<T> merge(Publisher<? extends T>... sources)
1 | Flux.merge( |
订阅第一个流
订阅第二流
100
10
101
11
102
12
103
13
104
14
zip - 压缩合并为二元数据
将两个源压缩到一起,也就是说,等待所有源发出一个元素,并将这些元素合并到Tuple2中。操作符将继续这样做,直到任何一个源完成。
public static <T1, T2> Flux<Tuple2<T1, T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)
1 | Flux.zip( |
[1,100]
[2,101]
[3,102]
[4,103]
[5,104]
[6,105]
[7,106]
[8,107]
[9,108]
[10,109]
combineLatest - 压缩合并数据,以新值为准使用自定义函数
都是压缩合并数据,与zip不同的是,combineLatest 数据是由来自两个Publisher源的最新发布值的组合生成的。
public static <T1, T2, V> Flux<V> combineLatest(Publisher<? extends T1> source1,Publisher<? extends T2> source2,BiFunction<? super T1, ? super T2, ? extends V> combinator)
1 | Flux.combineLatest( |
1==100
2==100
3==100
4==100
4==101
5==101
5==102
6==102
7==102
7==103
8==103
9==103
批处理
buffer - 达到缓冲区大小时,传递数据
将传入的值收集到多个List缓冲区中,这些缓冲区将在每次达到给定的最大大小或在此Flux完成时由返回的Flux发出。
public final Flux<List<T>> buffer(int maxSize)
1 | Flux.range(1,100) |
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
[71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
windowUntil - 满足窗口条件时传递数据
1 | Flux.range(101, 20) |
第二个参数为true时:
window:[]
window:[101, 102]
window:[103, 104, 105, 106]
window:[107, 108]
window:[109, 110, 111, 112]
window:[113, 114, 115, 116, 117, 118, 119, 120]第二个参数为false时:
window:[101]
window:[102, 103]
window:[104, 105, 106, 107]
window:[108, 109]
window:[110, 111, 112, 113]
window:[114, 115, 116, 117, 118, 119, 120]
groupBy
1 | Flux.range(1, 7) |
奇数======[1]
偶数======[2]
奇数======[1, 3]
偶数======[2, 4]
奇数======[1, 3, 5]
偶数======[2, 4, 6]
奇数======[1, 3, 5, 7]
thenMany - 触发新流
1 | Flux.just(1,2,3,4,5,6) |
1
2
3
4
5
6
7
8
收集
List
普通List
收集此Flux发出的所有元素到一个List中,该List在此序列完成时由结果Mono发出。
public final Mono<List<T>> collectList()
1 | Flux.just(1,2,36,4,25,6,7) |
[1, 2, 36, 4, 25, 6, 7]
排序List
public final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator)
1 | Flux.just(1,2,36,4,25,6,7) |
[36, 25, 7, 6, 4, 2, 1]
收集集合中的序列元素可能耗费资源,当序列具有许多元素时这种现象尤为突出。此外,尝试在无限流上收集数据可能消耗所有可用的内存。
Map
自定义key值
public final <K> Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor)
1 | Flux.just(1, 2, 3, 4, 5, 6) |
{key1=1, key2=2, key5=5, key6=6, key3=3, key4=4}
自定义key、value值
public final <K, V> Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,Function<? super T, ? extends V> valueExtractor)
1 | Flux.just(1, 2, 3, 4, 5, 6) |
{key-1=value-1, key-5=value-5, key-4=value-4, key-3=value-3, key-2=value-2, key-6=value-6}
自定义key、value值、map工厂
public final <K, V> Mono<Map<K, V>> collectMap(final Function<? super T, ? extends K> keyExtractor,final Function<? super T, ? extends V> valueExtractor,Supplier<Map<K, V>> mapSupplier)
1 | Flux.just(1,2,3,4,5,6) |
{key-1=value-1, key-5=value-5, key-4=value-4, key-3=value-3, key-2=value-2, key-6=value-6, key:9=value:9, key:8=value:8, key:7=value:7}
Multimap
多值map,自定义key、value
public final <K, V> Mono<Map<K, Collection<V>>> collectMultimap(Function<? super T, ? extends K> keyExtractor,Function<? super T, ? extends V> valueExtractor)
1 | Flux.just(1, 2, 3, 4, 5) |
{
key-1=[value-1, value-3, value-5],
key-2=[value-2, value-4]
}
多值map,自定义key、value、map工厂
public final <K, V> Mono<Map<K, Collection<V>>> collectMultimap(final Function<? super T, ? extends K> keyExtractor,final Function<? super T, ? extends V> valueExtractor,Supplier<Map<K, Collection<V>>> mapSupplier
1 | Flux.just(1, 2, 3, 4, 5) |
{
key-1=[value-1, value-3, value-5],
2-key=[ele:0, ele:0, ele:1],
key-2=[value-2, value-4],
0-key=[ele:0, ele:0, ele:1],
1-key=[ele:0, ele:0, ele:1]
}
转换阻塞结构
Iterable
1 | Iterable<Integer> integers = Flux.just(1, 2, 3, 4) |
1
2
3
4
Stream
1 | Stream<Integer> integerStream = Flux.just(1, 2, 3).toStream(); |
1
2
3
BlockFirst
阻塞了当前线程,直到上游发出第一个值或完成流为止
1 | Integer integer = Flux.just(1, 2, 3) |
onNext:1
1
blockLast
阻塞当前线程,直到上游发出最后一个值或完成流为止。在 onError的情况下,它会在被阻塞的线程中抛出异常。
1 | Integer integer2 = Flux.just(1, 2, 3) |
onNext:1
onNext:2
onNext:3
3
物化和非物化信号
将流中的元素封装为Signal对象进行处理。
正常调用
1 | Flux.just(1, 2, 3) |
是否完成:false
1
是否完成:false
2
是否完成:false
3
是否完成:false
[ERROR] (parallel-1) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 手动异常
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 手动异常
Caused by: java.lang.Exception: 手动异常
at com.yrl.reactor.ReactorDemo.main(ReactorDemo.java:12)
物化调用
1 | Flux.just(1, 2, 3) |
是否完成:false
onNext(1)
是否完成:false
onNext(2)
是否完成:false
onNext(3)
是否完成:false
onError(java.lang.Exception: 手动异常)
是否完成:true
非物化调用
1 | Flux.just(1, 2, 3) |
是否完成:false
1
是否完成:false
2
是否完成:false
3
是否完成:false
[ERROR] (parallel-1) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 手动异常
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 手动异常
Caused by: java.lang.Exception: 手动异常
at com.yrl.reactor.ReactorDemo.main(ReactorDemo.java:12)
冷热数据
冷发布者:无论订阅者何时出现,都为该订阅者生成所有的序列数据,没有订阅者就不会生成数据。
热发布者:数据的生成不依赖于订阅者而存在,数据可能在第一个订阅者出现之间就开始产生了。
冷发布
1 | Flux<String> coldPublisher = Flux.defer(()->{ |
尚未生成数据
生成数据
onNext:1d496bec-d1be-4274-b24a-74ece485e51a
生成数据
onNext:19ab947e-bfd7-4600-9ffe-0e3a0b91c58c
为两次订阅生成两次数据
冷发布同时订阅通知
使用publish()返回的 ConnectableFlux 可以向几个订阅者多播事件。
1 | Flux<String> coldPublisher = Flux.defer(() -> { |
尚未生成数据
准备建立连接
生成数据
onNext:ad8292c5-3d65-488e-9659-cdc7a7a2a559
onNext:ad8292c5-3d65-488e-9659-cdc7a7a2a559
1 | Flux<Integer> source = Flux.range(0,3) |
准备建立连接
对冷发布者的新订阅票据:reactor.core.publisher.FluxRange$RangeSubscription@22a71081
[subscriber 1] onNext:0
[subscriber 2] onNext:0
[subscriber 1] onNext:1
[subscriber 2] onNext:1
[subscriber 1] onNext:2
[subscriber 2] onNext:2
缓存
1 | Flux<Integer> source = Flux.range(0,5) |
冷发布者的新订阅数据
[subscribe 1] on Next:0
[subscribe 1] on Next:1
[subscribe 1] on Next:2
[subscribe 1] on Next:3
[subscribe 1] on Next:4
[subscribe 2] on Next:0
[subscribe 2] on Next:1
[subscribe 2] on Next:2
[subscribe 2] on Next:3
[subscribe 2] on Next:4
冷发布者的新订阅数据
[subscribe 3] on Next:0
[subscribe 3] on Next:1
[subscribe 3] on Next:2
[subscribe 3] on Next:3
[subscribe 3] on Next:4
共享-热发布
使用publish()需要等待订阅者出现才能开始处理。
share可以将冷发布者转变为热发布者,会为每个新订阅者传播订阅者尚未错过的事件。
1 | Flux<Integer> source = Flux.range(0, 5) |
冷发布者新的订阅票据
subscribe 1 onNext:0
subscribe 1 onNext:1
subscribe 1 onNext:2
subscribe 1 onNext:3
subscribe 2 onNext:3
subscribe 1 onNext:4
subscribe 2 onNext:4