project reactor切换线程导致内存溢出的坑

project reactor切换线程导致内存溢出的坑

​ reactor会在很多地方使用到缓存,或者称为预取功能。但当上游生产的对象比较大想要减少预取的数量时可能会踩到坑,需要格外注意。

事例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
Flux.range(0, 10000)
.buffer(1)
// 问题在这里
.parallel(5, 1)
.runOn(Schedulers.boundedElastic())
.doOnNext(list -> {
System.out.println("处理数据:" + list.size());
//模拟耗时
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
})
.checkpoint()
.then()
.block();
}

这段代码中我们使用Flux.range模拟生产者,假设生产的对象大,比如说从db中查询数据,我们不希望一次性全部生产到内存中。

本意上我们使用parallel将处理分片为5分,通过runOn分配到5个线程中同时消费。而且 parallel(5, 1) 中的第二个参数控制了分片预取的数量为1,所以每次从上游最多取5个元素。

但是并没有达到预期内存直接爆了。下面是runOn的源码,runOn 操作符同样也有预取能力,我们不填第二个参数默认256个。而这个参数是每个通道的预取数量,也就是说不知不觉中预取了 5 * 256 = 1280个元素,与我们原先设想的预取5个相差甚远。

1
2
3
4
5
6
7
public final ParallelFlux<T> runOn(Scheduler scheduler, int prefetch) {
Objects.requireNonNull(scheduler, "scheduler");
return onAssembly(new ParallelRunOn<>(this,
scheduler,
prefetch,
Queues.get(prefetch)));
}

其他场景

同理再看一个publishOn的例子,我们只是期望切换消费者线程,但不知不觉中预取了 256个元素 (publishOn的第二个参数,默认从上游预取256个) ,从而可能与我们的预期相悖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
Flux.range(0, 10000)
.buffer(1)
//
.publishOn(Schedulers.boundedElastic())
.doOnNext(list -> {
System.out.println("处理数据:" + list.size());
//模拟耗时
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
})
.checkpoint()
.then()
.block();
}

总结

使用reactor的多线程能力一定要注意预取控制,有些情况还是挺反预期的,最好自己指定第二个参数人工控制预取数量,否则可能导致预取数量引起oom。

还有一种 flatMap + publishOn的写法 , 利用 flatMap 控制并发数量和预取数量,同时内部使用 publishOn切换线程 (此时使用publishOn就无需指定预取数量了,因为只有一个)。 这种用法不会预取数量过多。

flatMap第一个参数会返回一个流,事例代码中限制从上游取5个生成5个子流,只有前一个流完成后才会向上游再申请数据,所以不存在对上游的预取。

flatMap的第三个参数prefetch值表示是从第一个参数的返回的子流中预取的数量,而不是从flatMap的上游预取的数量,如果返回的Mono本身不会产生大量的数据,不写也可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
Flux.range(0, 10000)
.buffer(1)
.flatMap(list -> Mono.fromRunnable(() -> {
System.out.println("处理数据:" + list.size());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}).publishOn(Schedulers.boundedElastic())
, 5
, 1)
.blockLast();
}

project reactor切换线程导致内存溢出的坑
https://www.huangchaoyu.com/2533120135.html
作者
hcy
发布于
2025年12月2日
更新于
2025年12月3日
许可协议