0%

响应式编程-ProjectReactor

Project Reactor

响应式概念

响应式编程通常以面向对象的语言呈现,作为观察者设计模式的扩展。

通过编写异步、非阻塞的代码寻求更高的处理效率。

Callback异步处理,通过Future获取结果可能会出现回调地狱,CompletableFuture可以将多个Future进行编排,但使用上并不是那么容易,且不支持惰性计算、多值和处理的灵活性。

响应式库(例如 Reactor)旨在解决 JVM 上“经典”异步方法的这些缺点:

  • 可组合性和可读性
  • 数据作为使用丰富的运算符词汇操作的流
  • 在订阅之前什么都不会发生
  • 背压或消费者向生产者发出排放率过高信号的能力
  • 与并发无关的高级但高价值的抽象

reactort 概念

  • 对响应式流规范的一种实现
  • Spring WebFlux默认的响应式框架
  • 完全异步非阻塞,对背压支持
  • 提供两个异步序列API:Flux[N]、Mono[0|1]
  • 提供对响应式流的操作

依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.0</version>
</dependency>

接口定义

  • Publisher - 发布者

    1
    2
    3
    public interface Publisher<T>{
    public void subscribe(Subscriber<? super T> s);
    }
    • 发送onNext信号次数不得超过订阅者请求的元素个数
    • 如果进程失败,需要调用onError告知订阅者
    • Publisher.subscribe()内部需调用onSubscribe(Subscriber s)
    • 允许同时存在多个订阅者
  • Subscriber - 订阅者

    1
    2
    3
    4
    5
    6
    public interface Subscriber<T>{
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
    }
    • 需要调用Subscriptions.request(long n)来向发布者请求数据,n指请求多少数据量
    • onCompleteonError方法,终止方法
    • 只能存在一个活跃的订阅
    • 调用Subscription.cancel()之后仍有onNext信号发送
  • Subscription - 订阅关系

    1
    2
    3
    4
    public interface Subscription{
    public void request(long n);
    public void cancel();
    }
    • 需要在当前订阅者的上下文中被调用
    • Subscription.request(long n)最多请求2^63-1个元素
  • Processor - 处理器,reactor3.5后废弃

    1
    2
    3
    public interface Processor<T,R> extends Subscriber<T>,Publisher<R>{

    }

使用

创建

Mono

Mono最多可以指定一个元素

  • just - 指定一个元素

    1
    2
    3
    //指定元素,只能指定一个元素
    Mono<String> just = Mono.just("hello");
    just.subscribe(System.out::println);//hello
  • justOrEmpty - 指定可能为空的元素

    1
    2
    3
    4
    5
    6
    7
    8
    9
    //指定可能为空的元素
    Mono<Object> objectMono = Mono.justOrEmpty(null);
    objectMono.subscribe(System.out::println);//无输出

    Mono<Object> objectMono = Mono.justOrEmpty(Optional.empty());
    objectMono.subscribe(System.out::println);//无输出

    Mono<Object> objectMono = Mono.justOrEmpty(123);
    objectMono.subscribe(System.out::println);//123
  • fromCallable - 异步调用创建,并未试出异步效果?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
        public static String request() throws IOException {
    URL url = new URL("http://www.baidu.com");
    URLConnection urlConnection = url.openConnection();
    urlConnection.connect();
    InputStream inputStream = urlConnection.getInputStream();
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
    StringBuilder sb = new StringBuilder();
    String temp = null;
    while ((temp = bufferedReader.readLine()) != null) {
    sb.append(temp).append("\r\n");
    }
    System.out.println(Thread.currentThread().getName());//打印出来还是main线程?
    return sb.toString();
    }

    public static void main(String[] args) throws IOException {
    // System.out.println(request());
    Mono.just(request()).subscribe(System.out::println);
    Mono.fromCallable(() -> request()).subscribe(System.out::println);
    Mono.fromCallable(AsyncMonoTest::request).subscribe(System.out::println);
    Mono.fromCallable(AsyncMonoTest::request)
    .subscribe(
    data -> {
    System.out.println(Thread.currentThread().getName());
    System.out.println(data);
    },
    ex -> System.out.println("出错:" + ex),
    () -> System.out.println("请求完成")
    );
    }

Flux

Flux可以指定多个元素创建响应式流

  • just - 列举创建

    1
    2
    Flux<String> just = Flux.just("hello", "every body");
    just.subscribe(System.out::println);

    hello
    every body

  • fromArray - 数组创建

    1
    2
    Flux<String> stringFlux = Flux.fromArray(new String[]{"hello", "world"});
    stringFlux.subscribe(System.out::println);

    hello
    every body

  • fromIterable - iterable接口创建

    1
    2
    Flux<Integer> iterableFlux = Flux.fromIterable(Arrays.asList(1, 2, 3, 4));
    iterableFlux.subscribe(System.out::println);

    1
    2
    3
    4

  • range - 递增序列创建

    1
    2
    3
    //第一个参数(即1000)为递增序列的初始值;第二个参数为递增的总数据量,包含初始值
    Flux<Integer> rangeFlux = Flux.range(1000, 5);
    rangeFlux.subscribe(System.out::println);

    1000
    1001
    1002
    1003
    1004

  • from - 基于publisher创建

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    Flux.from(new Publisher<String>() {
    /**
    * 请求Publisher开始流数据
    * 这是一个“工厂方法”,可以多次调用,每次启动一个新的Subscription。
    * 每个订阅(Subscription)只适用于单个订阅服务器(Subscriber)。
    * 订阅服务器(Subscriber)只能向单个发布服务器(Publisher)订阅一次。
    * 如果发布者(Publisher)拒绝订阅(Subscriber)尝试或以其他方式失败,它将通过Subscriber.onError发出错误信号。
    * @param subscriber
    */
    @Override
    public void subscribe(Subscriber<? super String> subscriber) {
    for (int i = 0; i < 5; i++) {
    subscriber.onNext("hello - " + i);
    }
    subscriber.onComplete();
    }
    }).subscribe(
    data -> System.out.println(data),
    ex -> System.out.println("出错:" + ex),
    () -> System.out.println("请求完成")
    );

    //lambda简化
    Flux.from((Publisher<String>) subscriber -> {
    for (int i = 0; i < 5; i++) {
    subscriber.onNext("hello - " + i);
    }
    subscriber.onComplete();
    }).subscribe(
    data -> System.out.println(data),
    ex -> System.out.println("出错:" + ex),
    () -> System.out.println("请求完成")
    );

    hello - 0
    hello - 1
    hello - 2
    hello - 3
    hello - 4
    请求完成

  • interval - 间隔创建

    指定时间间隔内,从0开始递增产生值

    public static Flux<Long> interval(Duration delay, Duration period, Scheduler timer)

    第一个参数:延迟开始时间、第二个参数:间隔时间周期、第三个参数:调度线程池

1
2
Flux.interval(Duration.ZERO, Duration.ofMillis(1000))
.subscribe(System.out::println);

编程方式创建

generate

逐一对信号进行加工输出创建Flux,通过BiFunction函数对状态加工输出,发送给下游订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Flux<String> flux = Flux.generate(
//为每个订阅提供初始状态值
new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 0;
}
},
//处理状态值,为订阅者提供数据
//state:状态值;sink
new BiFunction<Integer, SynchronousSink<String>, Integer>() {
@Override
public Integer apply(Integer state, SynchronousSink<String> sink) {
sink.next("3 x " + state + " = " + 3 * state);//将数据输出给订阅者
if (state == 10) {
//当前订阅的流已结束
sink.complete();
}
//传递状态值,会继续调用apply方法
return state + 1;
}
},
//当第二个参数终止或者下游调用取消时,这里接收最后一个状态值
new Consumer<Integer>() {
@Override
public void accept(Integer state) {
System.out.println("state:" + state);
}
}
);
flux.subscribe(System.out::println);


//lambda简化
Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {
sink.next("3 x " + state + " = " + 3 * state);
if (state == 10) {
sink.complete();
}
return state + 1;
},
(state) -> System.out.println("state:" + state));

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
state:11

push

适用于单线程生产者,默认背压策略:OverflowStrategy.BUFFER

1
2
3
4
5
6
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter) {
return push(emitter, OverflowStrategy.BUFFER);
}
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_ONLY));
}
1
2
3
4
5
6
7
Flux.push(new Consumer<FluxSink<Integer>>() {
@Override
public void accept(FluxSink<Integer> fluxSink) {
IntStream.range(1, 10)
.forEach(item -> fluxSink.next(item));
}
}).subscribe(System.out::println);
create

使用上与push类似,适用于多线程生产者,默认背压策略:OverflowStrategy.BUFFER

1
2
3
4
5
6
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
return create(emitter, OverflowStrategy.BUFFER);
}
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL));
}

多线程使用上如何体现?

1
2
3
4
5
6
7
Flux.create(new Consumer<FluxSink<Integer>>() {
@Override
public void accept(FluxSink<Integer> fluxSink) {
IntStream.range(1, 10)
.forEach(item -> fluxSink.next(item));
}
}).subscribe(System.out::println);

背压策略

1
2
3
4
5
6
7
8
9
10
11
12
enum OverflowStrategy {
//完全忽略下游的背压请求。
IGNORE,
//当消费不过来有新元素下发时,报错。
ERROR,
//如果下游没有准备好接收信号,则丢弃传入信号。
DROP,
//只会保留最新的元素。
LATEST,
//下发元素时会在一个队列中进行缓存。
BUFFER
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1,1000)
.delayElements(Duration.ofMillis(10))
.onBackpressureError()
.delayElements(Duration.ofMillis(100))
.subscribe(
System.out::println,
ex ->{
System.out.println(ex);
latch.countDown();
},
() ->{
System.out.println("处理完毕");
latch.countDown();
}
);
latch.await();
System.out.println("main结束");

1
2
3
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue…)
main结束

订阅

当订阅时,Subscriber会创建一个对象链,沿着链向上寻找第一个发布者。

基于 Lambda 订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//订阅并触发序列
subscribe();
//对每个产生的值做一些事情
subscribe(Consumer<? super T> consumer);
//处理值但也对错误做出反应
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
//处理值和错误,但也在序列成功完成时运行一些代码
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
//处理值和错误并成功完成,但也对该调用Subscription产生的结果做一些事情
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);

所有这些subscribe()都有一个Disposable返回类型,可以通过调用其dispose()方法取消订阅。

对于Fluxor Mono,取消是一个信号,表明源应该停止生成元素。但是,它不能保证是立即的:某些源可能会非常快地生成元素,以至于它们甚至可以在收到取消指令之前完成

基于BaseSubscriber订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Flux<Integer> flux = Flux.range(1, 4);
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}

@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
request(1);
}
});

Subscribed
1
2
3
4

using - 资源统一处理

public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)

  • resourceSupplier:在订阅时被调用,以生成资源的可调用对象

  • sourceSupplier:从resourceSupplier获取Publisher,数据源

  • resourceCleanup:在完成时,调用资源对象的清理回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
    static class Connection implements AutoCloseable {
private final Random rnd = new Random();

static Connection newConnection() {
System.out.println("创建Connection对象");
return new Connection();
}

public Iterable<String> getData() {
if (rnd.nextInt(10) < 3) {
throw new RuntimeException("通信异常");
}
return Arrays.asList("数据1", "数据2");
}

// close方法可以释放内部资源,并且应该始终被调用,即使在getData执行期间发生错误也是如此。
@Override
public void close() {
System.out.println("关闭Connection连接");
}
}

public static void main(String[] args) {
// try (Connection connection = Connection.newConnection()) {
// connection.getData().forEach(data -> System.out.println("接收的数据:" +
// data));
// } catch (Exception e) {
// System.err.println("错误信息:" + e);
// }

Flux.using(
Connection::newConnection,
connection -> Flux.fromIterable(connection.getData()),
Connection::close
).subscribe(
data -> System.out.println("onNext接收到数据:" + data),
ex -> System.err.println("onError接收到的异常信息:" + ex),
() -> System.out.println("处理完毕")
);
}

创建Connection对象
关闭Connection连接
onError接收到的异常信息:java.lang.RuntimeException: 通信异常

Schedulers

调度线程池

  • Schedulers.newSingle():单个可重复使用的线程。
  • Schedulers.elastic(): 无界弹性线程池,有隐藏背压问题并导致过多线程的倾向,3.5.0开始弃用。
  • Schedulers.boundedElastic():有界弹性线程池 ,会根据需要创建新的工作池并重用闲置的工作池;闲置时间过长(默认为 60 秒)的工作池也会被处理掉。与elastic()不同,它对其可以创建的支持线程数有上限(默认为 CPU 核心数 x 10)。达到上限后提交的最多 100 000 个任务将被排入队列,并在线程可用时重新安排(延迟调度时,延迟在线程可用时开始)。
  • Schedulers.parallel() :为并行工作调整的固定工作池,创建与CPU 内核一样多的worker。
  • Schedulers.newParallel() :指定线程数,线程工厂的自定义线程池。可以使用newXXX 方法自定义创建各种调度程序类型的新实例。

publishOn

影响调用publishOn()之后方法的执行上下文,任务被提交给Schedulers 的线程执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
.range(1, 2)
.map(i -> {
System.out.println(Thread.currentThread().getName());//main
return 10 + i;
}
)
.publishOn(s)//此方法之后的链路切换为Scheduler线程
.map(i -> {
System.out.println(Thread.currentThread().getName()); //parallel-scheduler-1
return "value" + i;
}
);

flux.subscribe(item -> System.out.println(Thread.currentThread().getName() + " " + item));//parallel-scheduler-1

main
main
parallel-scheduler-1
parallel-scheduler-1 value11
parallel-scheduler-1
parallel-scheduler-1 value12

subscribeOn

从订阅开始整条链路的执行上下文都交给Schedulers 的线程执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
.range(1, 2)
.map(i -> {
System.out.println(Thread.currentThread().getName());//parallel-scheduler-1
return 10 + i;
}
)
.subscribeOn(s) //从订阅时间开始切换整个序列的上下文交给Scheduler的线程执行
.map(i -> {
System.out.println(Thread.currentThread().getName()); //parallel-scheduler-1
return "value" + i;
}
);

flux.subscribe(item -> System.out.println(Thread.currentThread().getName() + " " + item));//parallel-scheduler-1

parallel-scheduler-1
parallel-scheduler-1
parallel-scheduler-1 value11
parallel-scheduler-1
parallel-scheduler-1
parallel-scheduler-1 value12

错误处理

doOnError:出错时触发

1
2
3
4
5
Flux<Integer> range = Flux
.range(1000, 3)
.concatWith(Flux.error(new RuntimeException("出错")))
.doOnError((ex) -> System.out.println(ex));
range.subscribe(System.out::println);

1000
1001
1002
java.lang.RuntimeException: 出错
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 出错
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 出错
Caused by: java.lang.RuntimeException: 出错
at com.yrl.reactor.ReactorDemo1.main(ReactorDemo1.java:18)

retry:重试

不指定重试次数则无限重试

1
2
3
4
5
6
Flux<Integer> range = Flux
.range(1000, 3)
.concatWith(Flux.error(new RuntimeException("出错")))
.doOnError((ex) -> System.out.println(ex))
.retry(1);
range.subscribe(System.out::println);

1000
1001
1002
java.lang.RuntimeException: 出错
1000
1001
1002
java.lang.RuntimeException: 出错
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 出错
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 出错
Caused by: java.lang.RuntimeException: 出错
at com.yrl.reactor.ReactorDemo1.main(ReactorDemo1.java:18)

Process finished with exit code 0

onErrorResume:出错使用备用流

根据错误返回对应的流

1
2
3
4
5
6
7
8
9
Flux<Integer> range = Flux
.range(1000, 3)
.concatWith(Flux.error(new RuntimeException("出错")))
.doOnError((ex) -> System.out.println(ex))
.onErrorResume(ex -> {
System.out.println("onErrorResume - ex:" + ex);
return Flux.just(11111);
});
range.subscribe(System.out::println);

1000
1001
1002
java.lang.RuntimeException: 出错
onErrorResume - ex:java.lang.RuntimeException: 出错
11111

Process finished with exit code 0

onErrorReturn - 存在错误则返回指定值

1
2
3
4
5
6
Flux<Integer> range = Flux
.range(1000, 3)
.concatWith(Flux.error(new RuntimeException("出错")))
.doOnError((ex) -> System.out.println(ex))
.onErrorReturn(11111);
range.subscribe(System.out::println);

1000
1001
1002
java.lang.RuntimeException: 出错
11111

onErrorMap - 对错误进行转换

1
2
3
4
5
6
Flux<Integer> range = Flux
.range(1000, 3)
.concatWith(Flux.error(new RuntimeException("出错")))
.doOnError((ex) -> System.out.println(ex))
.onErrorMap(ex -> new Exception(ex.getMessage()));
range.subscribe(System.out::println);

1000
1001
1002
java.lang.RuntimeException: 出错
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 出错
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 出错
Caused by: java.lang.Exception: 出错
at com.yrl.reactor.ReactorDemo1.lambda$main$1(ReactorDemo1.java:21)
at reactor.core.publisher.Flux.lambda$onErrorMap$27(Flux.java:6544)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2023)
at reactor.core.publisher.Operators.error(Operators.java:196)
at reactor.core.publisher.FluxError.subscribe(FluxError.java:43)
at reactor.core.publisher.Flux.subscribe(Flux.java:8095)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:8095)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8268)
at reactor.core.publisher.Flux.subscribe(Flux.java:8065)
at reactor.core.publisher.Flux.subscribe(Flux.java:7989)
at reactor.core.publisher.Flux.subscribe(Flux.java:7932)
at com.yrl.reactor.ReactorDemo1.main(ReactorDemo1.java:22)

defaultIfEmpty - 为空时使用默认值

1
Flux.empty().defaultIfEmpty("hello").subscribe(System.out::println);

hello

defer - 延迟执行

public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)

延迟执行,有订阅者出现才会执行,且为每个订阅者生成所有数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static String data() {
String data = UUID.randomUUID().toString();
System.out.println("生成数据:" + data);
return data;
}

public static void test() {
Mono<String> mono = Mono.just(data());
Flux<String> flux = Flux.just(data());
//不订阅也会执行data()逻辑
//生成数据:3915beeb-d8c3-4201-a296-9c8bfaa5d90d
//生成数据:4d5fffff-1546-4b7b-accb-3fd501b78606
//=========

Mono<String> deferMono = Mono.defer(() -> Mono.just(data()));
Flux<String> deferFlux = Flux.defer(() -> Flux.just(data()));
System.out.println("=========");
deferMono.subscribe();//订阅才会执行data()逻辑
deferFlux.subscribe();//订阅才会执行data()逻辑
//=========
//生成数据:c7dbd697-bd1a-42af-b832-b39a48f8374e
//生成数据:20c38ced-1476-4624-af23-6835c05ba32b
}

doOnNext

public final Flux<T> doOnNext(Consumer<? super T> onNext)

通过该方法可以对 Flux 或 Mono 上的每个元素执行一些操作,回调Subscriber.onNext

1
2
3
Flux.range(100,3)
.doOnNext(System.out::println)
.subscribe();

100
101
102

doOnComplete

完成订阅处理后回调doOnComplete方法

public final Flux<T> doOnComplete(Runnable onComplete)

1
2
3
4
5
6
Flux<Integer> range = Flux
.range(1000, 3)
.doOnComplete(() -> {
System.out.println("完成");
});
range.subscribe(System.out::println);

1000
1001
1002
完成

doOnTerminate

无论什么原因终止都会被调用

log - 日志跟踪

观察所有活性流信号并使用Logger支持跟踪它们,默认使用info级别和java.util.logging(如果SLF4J可用则使用它)

1
2
3
4
Flux<Integer> range = Flux
.range(1000, 3)
.log();
range.subscribe(System.out::println);

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1000)
1000
[ INFO] (main) | onNext(1001)
1001
[ INFO] (main) | onNext(1002)
1002
[ INFO] (main) | onComplete()

map - 转换数据

使用Function函数转换入参数据,返回指定形式数据

public final <V> Flux<V> map(Function<? super T, ? extends V> mapper)

1
2
3
4
Flux.range(100,3)
.map(num -> "hello " + num)//传入Function类型的lambda表达式转换数据格式
.doOnNext(System.out::println)//打印数据流上的数据
.subscribe();//订阅数据

hello 100
hello 101
hello 102

扁平化数据

flatMap - 立即订阅,交错处理

同时订阅数据,数据交错处理

1
2
3
4
5
6
7
8
9
Random random = new Random();
Flux.just(Arrays.asList(1,2,3),Arrays.asList("a","b","c","d"),Arrays.asList(7,8,9))
.doOnNext(System.out::println)
.flatMap(item -> Flux.fromIterable(item)
.doOnSubscribe(subscription -> {
System.out.println("已经订阅");
})//增加一个延时,订阅后延时一段时间再发送
.delayElements(Duration.ofMillis(random.nextInt(100) + 100))
).subscribe(System.out::println);

[1, 2, 3]
已经订阅
[a, b, c, d]
已经订阅
[7, 8, 9]
已经订阅
1
a
7
2
b
3
c
8
d
9

concatMap- 顺序订阅,顺序处理

1
2
3
4
5
6
7
8
9
Random random = new Random();
Flux.just(Arrays.asList(1,2,3),Arrays.asList("a","b","c","d"),Arrays.asList(7,8,9))
.doOnNext(System.out::println)
.concatMap(item -> Flux.fromIterable(item)
.doOnSubscribe(subscription -> {
System.out.println("已经订阅");
})//增加一个延时,订阅后延时一段时间再发送
.delayElements(Duration.ofMillis(random.nextInt(100) + 100))
).subscribe(System.out::println);

[1, 2, 3]
已经订阅
[a, b, c, d]
[7, 8, 9]
1
2
3
已经订阅
a
b
c
d
已经订阅
7
8
9

flatMapSequential - 立即订阅,顺序处理

1
2
3
4
5
6
7
8
9
Random random = new Random();
Flux.just(Arrays.asList(1,2,3),Arrays.asList("a","b","c","d"),Arrays.asList(7,8,9))
.doOnNext(System.out::println)
.flatMapSequential(item -> Flux.fromIterable(item)
.doOnSubscribe(subscription -> {
System.out.println("已经订阅");
})// 我们增加一个延时,订阅后延时一段时间再发送
.delayElements(Duration.ofMillis(random.nextInt(100) + 100))
).subscribe(System.out::println);

[1, 2, 3]
已经订阅
[a, b, c, d]
已经订阅
[7, 8, 9]
已经订阅
1
2
3
a
b
c
d
7
8
9

filter - 过滤

public final Flux<T> filter(Predicate<? super T> p)

1
2
3
4
5
Flux.range(100,3)
.filter(num -> num % 3 == 0)//过滤数据,只有102符合
.map(num -> "hello " + num)//数据转换
.doOnNext(System.out::println)//
.subscribe();

hello 102

index - 增加索引值

public final Flux<Tuple2<Long, T>> index()

为数据增加索引,构建二元数据 Tuple2<(index, value)>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Flux.range(100, 3)
.map(item -> "hello-" + item)
.index()
.doOnNext(item -> {
System.out.println("Tuple2:" + item);
// 二元组第一个元素 ,编号0开始
Long t1 = item.getT1();
System.out.println("索引:" + t1);
// 二元组第二个元素,也就是具体值
String t2 = item.getT2();
System.out.println("值:" + t2);
System.out.println();
})
.subscribe();

Tuple2:[0,hello-100]
索引:0
值:hello-100

Tuple2:[1,hello-101]
索引:1
值:hello-101

Tuple2:[2,hello-102]
索引:2
值:hello-102

timestamp - 增加时间戳

public final Flux<Tuple2<Long, T>> timestamp()

为数据增加当前时间戳,构建二元数据 Tuple2<(timestamp , value)>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Flux.just(100,101,102)
.map(item -> "hello-" + item)
.timestamp()
.doOnNext(item -> {
System.out.println("Tuple2:" + item);
// 二元组第一个元素 ,当前时间戳
Long t1 = item.getT1();
System.out.println("时间戳:" + t1);
// 二元组第二个元素,也就是具体值
String t2 = item.getT2();
System.out.println("值:" + t2);
System.out.println();
})
.subscribe();

Tuple2:[1669364259980,hello-100]
时间戳:1669364259980
值:hello-100

Tuple2:[1669364259980,hello-101]
时间戳:1669364259980
值:hello-101

Tuple2:[1669364259980,hello-102]
时间戳:1669364259980
值:hello-102

skip - 跳过

跳过指定数量的元素

public final Flux<T> skip(long skipped)

1
2
3
4
Flux.just(100,101,102)
.skip(2)
.doOnNext(System.out::println)
.subscribe();

102

then

忽略来自该Flux的元素,并将其完成信号转换为所新提供Mono的发射和完成信号,订阅者只处理Mono数据

1
2
3
4
Flux.just(1,2,3,4,5,6)
.doOnNext(item -> System.out.println("doOnNext:" + item))
.then(Mono.just(7))
.subscribe(System.out::println);

doOnNext:1
doOnNext:2
doOnNext:3
doOnNext:4
doOnNext:5
doOnNext:6
7

thenMany

让这个Flux完成后使用新的Flux,订阅者只处理新Flux数据

1
2
3
4
Flux.just(1, 2, 3, 4, 5, 6)
.doOnNext(item -> System.out.println("doOnNext:" + item))
.thenMany(Flux.just(8, 9))
.subscribe(System.out::println);

doOnNext:1
doOnNext:2
doOnNext:3
doOnNext:4
doOnNext:5
doOnNext:6
8
9

next - 只获取一个数据

只获取一个数据到一个新的Mono。如果为空,则发出一个空的Mono。

1
2
3
4
Mono<Integer> next = Flux
.range(1000, 5)
.next();
next.subscribe(System.out::println);

1000

take - 获取前N个数据

public final Flux<T> take(long n)

获取前N个数据后取消订阅剩余数据

1
2
3
4
Flux.just(100,101,102)
.take(2)//只获取2条数据
.doOnNext(System.out::println)
.subscribe();

100
101

takeLast - 获取后N个数据

public final Flux<T> takeLast(int n)

1
2
3
4
Flux.just(100,101,102)
.takeLast(2)
.doOnNext(System.out::println)
.subscribe();

101
102

takeUntil - 传递值,直到条件满足则停止

public final Flux<T> takeUntil(Predicate<? super T> predicate)

1
2
3
4
Flux.just(100,101,102)
.takeUntil(num -> num == 101)//传递100、101,此时规则为true,停止传递数据
.doOnNext(System.out::println)
.subscribe();

100
101

elementAt - 根据索引获取数据

public final Mono<T> elementAt(int index)

1
2
3
4
Flux.just(100,101,102)
.elementAt(2)
.doOnNext(System.out::println)
.subscribe();

102

takeUntilOther - 获取数据直到另一数据流数据到来,则停止该流处理

public final Flux<T> takeUntilOther(Publisher<?> other)

skipUntilOther(Publisher) - 跳过数据直到另一数据流数据到来,才开始该流处理

public final Flux<T> skipUntilOther(Publisher<?> other)

takeUntilOther与skipUntilOther结合的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//创建一个Flux,它发出从0开始增长,并在全局计时器上按指定的时间间隔递增。第一个元素在等于周期的初始延迟之后发出
Flux.interval(Duration.ofMillis(1000))
.map(item -> "item " + item)
.doOnNext(System.out::println)
//跳过前面的数据,当第三秒mono数据量加入时触发下面的subscribe打印"onNext: " + item
.skipUntilOther(Mono.just("start").delayElement(Duration.ofSeconds(3)))
//处理数据,当第六秒时停止处理
.takeUntilOther(Mono.just("end").delayElement(Duration.ofSeconds(6)))
.subscribe(
item -> System.out.println("onNext: " + item),
ex -> System.err.println("onError: " + ex),
() -> System.out.println("onCompleted")
);
Thread.sleep(10*1000);

item 0
item 1
item 2
onNext: item 2
item 3
onNext: item 3
item 4
onNext: item 4
onCompleted

all - 全部符合为rue

public final Mono<Boolean> all(Predicate<? super T> predicate)

1
2
3
Flux.just(2,4,6)
.all(item -> item % 2 == 0)
.subscribe(System.out::println);

true

any - 任一符合为rue

public final Mono<Boolean> all(Predicate<? super T> predicate)

1
2
3
Flux.just(1,2,3,4,5,6)
.any(item -> item % 2 == 0)
.subscribe(System.out::println);

true

hasElement - 是否含有指定元素

操作符检查流中是否包含某个所需的元素。短路逻辑,在元素与值匹配时立即返回true。

1
2
3
Flux.just(1,2,3,4,5,6)
.hasElement(5)
.subscribe(System.out::println);

true

repeat - 重复操作

public final Flux<T> repeat()无限订阅

public final Flux<T> repeat(long numRepeat)再原有基础上再订阅numRepeat次

1
2
3
4
Flux.just(100)
.repeat(2)//再原有基础上再重复订阅2次
.doOnNext(System.out::println)
.subscribe();

100
100
100

去重

distinctUntilChanged - 局部去重

public final Flux<T> distinctUntilChanged()

1
2
3
Flux.just(1,1,1,2,2,2,3,3,3,1,1,1,2,2,2)
.distinctUntilChanged()//局部范围去重
.subscribe(System.out::println);

1

2

3

1

2

distinct - 全局去重

public final Flux<T> distinct()

1
2
3
Flux.just(1,1,1,2,2,2,3,3,3,1,1,1,2,2,2)
.distinct()//全局去重
.subscribe(System.out::println);

1

2

3

组合

reduce - 前一步结果与当前元素组合

public final <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator)

使用BiFunction会将前一步的结果(首次为初始值)与当前的元素组合在一起

1
2
3
4
5
6
7
8
Flux.range(1,5)
//0位初始值
.reduce(0,(item1,item2)->{
System.out.println("item1:" + item1);
System.out.println("item2:" + item2);
System.out.println();
return item1 + item2;
}).subscribe(System.out::println);

item1:0
item2:1

item1:1
item2:2

item1:3
item2:3

item1:6
item2:4

item1:10
item2:5

15

scan - 前一步结果与当前元素组合

与reduce类似,区别在于scan会将中间结果打印

public final <A> Flux<A> scan(A initial, BiFunction<A, ? super T, A> accumulator)

1
2
3
4
5
6
7
Flux.range(1,5)
.scan(/*初始值*/0,(/*前一步的结果(首次为初始值)*/num1,/*新一轮传递过来的值*/num2) ->{
System.out.println("num1:" + num1);
System.out.println("num2:" + num2);
return num1 + num2;
})
.subscribe(System.out::println);

0
num1:0
num2:1
1
num1:1
num2:2
3
num1:3
num2:3
6
num1:6
num2:4
10
num1:10
num2:5
15

concat - 组合多个流,流之间存在顺序

通过向下游转发接收的元素来连接所有数据源。当操作符连接两个流时,它首先消费并重新发送第一个流的所有元素,然后对第二个流执行相同的操作。

public static <T> Flux<T> concat(Publisher<? extends T>... sources)

1
2
3
4
5
6
Flux.concat(
Flux.range(10,5).delayElements(Duration.ofMillis(100))
.doOnSubscribe(subscription -> System.out.println("订阅第一个流")),
Flux.range(100,5).delayElements(Duration.ofMillis(100))
.doOnSubscribe(subscription -> System.out.println("订阅第二流"))
).subscribe(System.out::println);

订阅第一个流
10
11
12
13
14
订阅第二流
100
101
102
103
104

merge - 组合多个流,流之间无顺序

将来自上游序列的数据合并到一个下游序列中。与 concat 操作符不同,上游数据源是立即(同时)被订阅的。

public static <T> Flux<T> merge(Publisher<? extends T>... sources)

1
2
3
4
5
6
7
Flux.merge(
Flux.range(10,5).delayElements(Duration.ofMillis(100))
.doOnSubscribe(subscription -> System.out.println("订阅第一个流")),
Flux.range(100,5).delayElements(Duration.ofMillis(100))
.doOnSubscribe(subscription -> System.out.println("订阅第二流"))
).subscribe(System.out::println);
Thread.sleep(10*1000);

订阅第一个流
订阅第二流
100
10
101
11
102
12
103
13
104
14

zip - 压缩合并为二元数据

将两个源压缩到一起,也就是说,等待所有源发出一个元素,并将这些元素合并到Tuple2中。操作符将继续这样做,直到任何一个源完成。

public static <T1, T2> Flux<Tuple2<T1, T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2)

1
2
3
4
5
6
7
Flux.zip(
Flux.range(1,10)
.delayElements(Duration.ofMillis(10)),
Flux.range(100,10)
.delayElements(Duration.ofMillis(10))
).subscribe(System.out::println);
Thread.sleep(10*1000);

[1,100]
[2,101]
[3,102]
[4,103]
[5,104]
[6,105]
[7,106]
[8,107]
[9,108]
[10,109]

combineLatest - 压缩合并数据,以新值为准使用自定义函数

都是压缩合并数据,与zip不同的是,combineLatest 数据是由来自两个Publisher源的最新发布值的组合生成的。

public static <T1, T2, V> Flux<V> combineLatest(Publisher<? extends T1> source1,Publisher<? extends T2> source2,BiFunction<? super T1, ? super T2, ? extends V> combinator)

1
2
3
4
5
6
Flux.combineLatest(
Flux.range(1,10).delayElements(Duration.ofMillis(1000)),
Flux.range(100,10).delayElements(Duration.ofMillis(2000)),
((integer1, integer2) -> integer1 + "==" +integer2)//聚合函数,它将从每个上游接收最新的值并将该值返回给下游信号
).subscribe(System.out::println);
Thread.sleep(10*1000);

1==100
2==100
3==100
4==100
4==101
5==101
5==102
6==102
7==102
7==103
8==103
9==103

批处理

buffer - 达到缓冲区大小时,传递数据

将传入的值收集到多个List缓冲区中,这些缓冲区将在每次达到给定的最大大小或在此Flux完成时由返回的Flux发出。

public final Flux<List<T>> buffer(int maxSize)

1
2
3
Flux.range(1,100)
.buffer(10)
.subscribe(System.out::println);

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
[31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
[51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
[71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
[91, 92, 93, 94, 95, 96, 97, 98, 99, 100]

windowUntil - 满足窗口条件时传递数据

1
2
3
4
5
6
7
8
9
10
Flux.range(101, 20)
//第一个参数:为true时创建新的窗口
//第二个参数:为true时加入新的窗口,false加入旧窗口
.windowUntil(/**判断是否为质数*/ReactorDemo::isPrime, true)
.subscribe(
data -> data.collectList()
.subscribe(
item -> System.out.println("window:" + item)
)
);

第二个参数为true时:
window:[]
window:[101, 102]
window:[103, 104, 105, 106]
window:[107, 108]
window:[109, 110, 111, 112]
window:[113, 114, 115, 116, 117, 118, 119, 120]

第二个参数为false时:
window:[101]
window:[102, 103]
window:[104, 105, 106, 107]
window:[108, 109]
window:[110, 111, 112, 113]
window:[114, 115, 116, 117, 118, 119, 120]

groupBy

1
2
3
4
5
6
7
8
9
10
11
Flux.range(1, 7)
.groupBy(item -> item % 2 == 0 ? "偶数" : "奇数")
.subscribe(groupFlux ->
groupFlux.scan(
/*初始值*/new ArrayList<>(),
(/*前一步的结果(首次为初始值)*/list, /*新一轮传递过来的值*/element) -> {
list.add(element);
return list;
}).filter(list -> !list.isEmpty()).
subscribe(item -> System.out.println(groupFlux.key() + "======" + item))
);

奇数======[1]
偶数======[2]
奇数======[1, 3]
偶数======[2, 4]
奇数======[1, 3, 5]
偶数======[2, 4, 6]
奇数======[1, 3, 5, 7]

thenMany - 触发新流

1
2
3
4
Flux.just(1,2,3,4,5,6)
.doOnNext(item -> System.out.println(item))
.thenMany(Flux.just(7,8))
.subscribe(System.out::println);

1
2
3
4
5
6
7
8

收集

List

普通List

收集此Flux发出的所有元素到一个List中,该List在此序列完成时由结果Mono发出。

public final Mono<List<T>> collectList()

1
2
3
Flux.just(1,2,36,4,25,6,7)
.collectList()
.subscribe(System.out::println);

[1, 2, 36, 4, 25, 6, 7]

排序List

public final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator)

1
2
3
4
Flux.just(1,2,36,4,25,6,7)
//CollectionSoredList 默认是升序
.collectSortedList(Comparator.reverseOrder())
.subscribe(System.out::println);

[36, 25, 7, 6, 4, 2, 1]
收集集合中的序列元素可能耗费资源,当序列具有许多元素时这种现象尤为突出。此外,尝试在无限流上收集数据可能消耗所有可用的内存。

Map

自定义key值

public final <K> Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor)

1
2
3
Flux.just(1, 2, 3, 4, 5, 6)
.collectMap(item -> "key" + item)
.subscribe(System.out::println);

{key1=1, key2=2, key5=5, key6=6, key3=3, key4=4}

自定义key、value值

public final <K, V> Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,Function<? super T, ? extends V> valueExtractor)

1
2
3
4
5
Flux.just(1, 2, 3, 4, 5, 6)
.collectMap(
item -> "key-" + item,
item -> "value-" + item
).subscribe(System.out::println);

{key-1=value-1, key-5=value-5, key-4=value-4, key-3=value-3, key-2=value-2, key-6=value-6}

自定义key、value值、map工厂

public final <K, V> Mono<Map<K, V>> collectMap(final Function<? super T, ? extends K> keyExtractor,final Function<? super T, ? extends V> valueExtractor,Supplier<Map<K, V>> mapSupplier)

1
2
3
4
5
6
7
8
9
10
11
12
13
Flux.just(1,2,3,4,5,6)
.collectMap(
integer -> "key-" + integer,
integer -> "value-" + integer,
()->{
Map<String,String> map = new HashMap<>();
for(int i =7 ;i <10;i++){
map.put("key:" + i ,"value:" + i);
}
return map;

}
).subscribe(System.out::println);

{key-1=value-1, key-5=value-5, key-4=value-4, key-3=value-3, key-2=value-2, key-6=value-6, key:9=value:9, key:8=value:8, key:7=value:7}

Multimap

多值map,自定义key、value

public final <K, V> Mono<Map<K, Collection<V>>> collectMultimap(Function<? super T, ? extends K> keyExtractor,Function<? super T, ? extends V> valueExtractor)

1
2
3
4
5
6
7
8
9
10
Flux.just(1, 2, 3, 4, 5)
.collectMultimap(
item -> {
if (item % 2 == 0) {
return "key-2";
}
return "key-1";
},
item -> "value-" + item
).subscribe(System.out::println);

{

​ key-1=[value-1, value-3, value-5],

​ key-2=[value-2, value-4]

}

多值map,自定义key、value、map工厂

public final <K, V> Mono<Map<K, Collection<V>>> collectMultimap(final Function<? super T, ? extends K> keyExtractor,final Function<? super T, ? extends V> valueExtractor,Supplier<Map<K, Collection<V>>> mapSupplier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Flux.just(1, 2, 3, 4, 5)
.collectMultimap(
item -> {
if (item % 2 == 0) {
return "key-2";
}
return "key-1";
},
item -> "value-" + item,
// 扩充
() -> {
Map map = new HashMap<String, List>();
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < i; j++) {
list.add("ele:" + j);
}
map.put(i + "-key", list);
}
return map;
}
).subscribe(System.out::println);

{

​ key-1=[value-1, value-3, value-5],

​ 2-key=[ele:0, ele:0, ele:1],

​ key-2=[value-2, value-4],

​ 0-key=[ele:0, ele:0, ele:1],

​ 1-key=[ele:0, ele:0, ele:1]

}

转换阻塞结构

Iterable

1
2
3
4
5
Iterable<Integer> integers = Flux.just(1, 2, 3, 4)
.toIterable();
for (Integer num : integers) {
System.out.println(num);
}

1
2
3
4

Stream

1
2
Stream<Integer> integerStream = Flux.just(1, 2, 3).toStream();
integerStream.forEach(System.out::println);

1
2
3

BlockFirst

阻塞了当前线程,直到上游发出第一个值或完成流为止

1
2
3
4
Integer integer = Flux.just(1, 2, 3)
.doOnNext(item -> System.out.println("onNext:" + item))
.blockFirst();
System.out.println(integer);

onNext:1
1

blockLast

阻塞当前线程,直到上游发出最后一个值或完成流为止。在 onError的情况下,它会在被阻塞的线程中抛出异常。

1
2
3
4
Integer integer2 = Flux.just(1, 2, 3)
.doOnNext(item -> System.out.println("onNext:" + item))
.blockLast();
System.out.println(integer2);

onNext:1
onNext:2
onNext:3
3

物化和非物化信号

将流中的元素封装为Signal对象进行处理。

正常调用

1
2
3
4
5
6
7
8
    Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.publishOn(Schedulers.parallel())
.concatWith(Flux.error(new Exception("手动异常")))
.doOnEach(item -> System.out.println("是否完成:" + item.isOnComplete()))
.subscribe(System.out::println);
Thread.sleep(10 * 1000);
}

是否完成:false
1
是否完成:false
2
是否完成:false
3
是否完成:false
[ERROR] (parallel-1) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 手动异常
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 手动异常
Caused by: java.lang.Exception: 手动异常
at com.yrl.reactor.ReactorDemo.main(ReactorDemo.java:12)

物化调用

1
2
3
4
5
6
7
8
Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.publishOn(Schedulers.parallel())
.concatWith(Flux.error(new Exception("手动异常")))
.materialize()
.doOnEach(item -> System.out.println("是否完成:" + item.isOnComplete()))
.subscribe(System.out::println);
Thread.sleep(10 * 1000);

是否完成:false
onNext(1)
是否完成:false
onNext(2)
是否完成:false
onNext(3)
是否完成:false
onError(java.lang.Exception: 手动异常)
是否完成:true

非物化调用

1
2
3
4
5
6
7
8
9
Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000))
.publishOn(Schedulers.parallel())
.concatWith(Flux.error(new Exception("手动异常")))
.materialize()
.doOnEach(item -> System.out.println("是否完成:" + item.isOnComplete()))
.dematerialize()//需要搭配materialize一起使用
.subscribe(System.out::println);
Thread.sleep(10 * 1000);

是否完成:false
1
是否完成:false
2
是否完成:false
3
是否完成:false
[ERROR] (parallel-1) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 手动异常
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: 手动异常
Caused by: java.lang.Exception: 手动异常
at com.yrl.reactor.ReactorDemo.main(ReactorDemo.java:12)

冷热数据

冷发布者:无论订阅者何时出现,都为该订阅者生成所有的序列数据,没有订阅者就不会生成数据。

热发布者:数据的生成不依赖于订阅者而存在,数据可能在第一个订阅者出现之间就开始产生了。

冷发布

1
2
3
4
5
6
7
8
Flux<String> coldPublisher = Flux.defer(()->{
System.out.println("生成数据");
return Flux.just(UUID.randomUUID().toString());
});
System.out.println("尚未生成数据");
coldPublisher.subscribe(e -> System.out.println("onNext:" + e));
coldPublisher.subscribe(e -> System.out.println("onNext:" + e));
System.out.println("为两次订阅生成两次数据");

尚未生成数据
生成数据
onNext:1d496bec-d1be-4274-b24a-74ece485e51a
生成数据
onNext:19ab947e-bfd7-4600-9ffe-0e3a0b91c58c
为两次订阅生成两次数据

冷发布同时订阅通知

使用publish()返回的 ConnectableFlux 可以向几个订阅者多播事件。

1
2
3
4
5
6
7
8
9
10
Flux<String> coldPublisher = Flux.defer(() -> {
System.out.println("生成数据");
return Flux.just(UUID.randomUUID().toString());
});
System.out.println("尚未生成数据");
ConnectableFlux<String> connectableFlux = coldPublisher.publish();
connectableFlux.subscribe(e -> System.out.println("onNext:" + e));
connectableFlux.subscribe(e -> System.out.println("onNext:" + e));
System.out.println("准备建立连接");
connectableFlux.connect();

尚未生成数据
准备建立连接
生成数据
onNext:ad8292c5-3d65-488e-9659-cdc7a7a2a559
onNext:ad8292c5-3d65-488e-9659-cdc7a7a2a559

1
2
3
4
5
6
7
8
Flux<Integer> source = Flux.range(0,3)
.doOnSubscribe(s -> System.out.println("对冷发布者的新订阅票据:" + s));
//准备一个ConnectableFlux,它共享此Flux序列,并以反压力感知的方式将值分发给订阅者
ConnectableFlux<Integer> conn = source.publish();
conn.subscribe(item -> System.out.println("[subscriber 1] onNext:" + item));
conn.subscribe(item -> System.out.println("[subscriber 2] onNext:" + item));
System.out.println("准备建立连接");
conn.connect();

准备建立连接
对冷发布者的新订阅票据:reactor.core.publisher.FluxRange$RangeSubscription@22a71081
[subscriber 1] onNext:0
[subscriber 2] onNext:0
[subscriber 1] onNext:1
[subscriber 2] onNext:1
[subscriber 1] onNext:2
[subscriber 2] onNext:2

缓存

1
2
3
4
5
6
7
Flux<Integer> source = Flux.range(0,5)
.doOnSubscribe(s -> System.out.println("冷发布者的新订阅数据"));
Flux<Integer> cacheSource = source.cache(Duration.ofMillis(1000));
cacheSource.subscribe(item -> System.out.println("[subscribe 1] on Next:" +item));
cacheSource.subscribe(item -> System.out.println("[subscribe 2] on Next:" +item));
Thread.sleep(1200);//缓存超时,重新订阅生成数据
cacheSource.subscribe(item -> System.out.println("[subscribe 3] on Next:" +item));

冷发布者的新订阅数据
[subscribe 1] on Next:0
[subscribe 1] on Next:1
[subscribe 1] on Next:2
[subscribe 1] on Next:3
[subscribe 1] on Next:4
[subscribe 2] on Next:0
[subscribe 2] on Next:1
[subscribe 2] on Next:2
[subscribe 2] on Next:3
[subscribe 2] on Next:4
冷发布者的新订阅数据
[subscribe 3] on Next:0
[subscribe 3] on Next:1
[subscribe 3] on Next:2
[subscribe 3] on Next:3
[subscribe 3] on Next:4

共享-热发布

使用publish()需要等待订阅者出现才能开始处理。

share可以将冷发布者转变为热发布者,会为每个新订阅者传播订阅者尚未错过的事件。

1
2
3
4
5
6
7
8
Flux<Integer> source = Flux.range(0, 5)
.delayElements(Duration.ofMillis(100))
.doOnSubscribe(s -> System.out.println("冷发布者新的订阅票据"));
Flux<Integer> shareSource = source.share();
shareSource.subscribe(item -> System.out.println("subscribe 1 onNext:" + item));
Thread.sleep(400);
shareSource.subscribe(item -> System.out.println("subscribe 2 onNext:" + item));
Thread.sleep(10*1000);

冷发布者新的订阅票据
subscribe 1 onNext:0
subscribe 1 onNext:1
subscribe 1 onNext:2
subscribe 1 onNext:3
subscribe 2 onNext:3
subscribe 1 onNext:4
subscribe 2 onNext:4