project reactor切换线程导致内存溢出的坑
reactor会在很多地方使用到缓存,或者称为预取功能。但当上游生产的对象比较大想要减少预取的数量时可能会踩到坑,需要格外注意。
事例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) { Flux.range(0, 10000) .buffer(1) .parallel(5, 1) .runOn(Schedulers.boundedElastic()) .doOnNext(list -> { System.out.println("处理数据:" + list.size()); try { Thread.sleep(5000); } catch (InterruptedException e) { } }) .checkpoint() .then() .block(); }
|
这段代码中我们使用Flux.range模拟生产者,假设生产的对象大,比如说从db中查询数据,我们不希望一次性全部生产到内存中。
本意上我们使用parallel将处理分片为5分,通过runOn分配到5个线程中同时消费。而且 parallel(5, 1) 中的第二个参数控制了分片预取的数量为1,所以每次从上游最多取5个元素。
但是并没有达到预期内存直接爆了。下面是runOn的源码,runOn 操作符同样也有预取能力,我们不填第二个参数默认256个。而这个参数是每个通道的预取数量,也就是说不知不觉中预取了 5 * 256 = 1280个元素,与我们原先设想的预取5个相差甚远。
1 2 3 4 5 6 7
| public final ParallelFlux<T> runOn(Scheduler scheduler, int prefetch) { Objects.requireNonNull(scheduler, "scheduler"); return onAssembly(new ParallelRunOn<>(this, scheduler, prefetch, Queues.get(prefetch))); }
|
其他场景
同理再看一个publishOn的例子,我们只是期望切换消费者线程,但不知不觉中预取了 256个元素 (publishOn的第二个参数,默认从上游预取256个) ,从而可能与我们的预期相悖。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) { Flux.range(0, 10000) .buffer(1) .publishOn(Schedulers.boundedElastic()) .doOnNext(list -> { System.out.println("处理数据:" + list.size()); try { Thread.sleep(5000); } catch (InterruptedException e) { } }) .checkpoint() .then() .block(); }
|
总结
使用reactor的多线程能力一定要注意预取控制,有些情况还是挺反预期的,最好自己指定第二个参数人工控制预取数量,否则可能导致预取数量引起oom。
还有一种 flatMap + publishOn的写法 , 利用 flatMap 控制并发数量和预取数量,同时内部使用 publishOn切换线程 (此时使用publishOn就无需指定预取数量了,因为只有一个)。 这种用法不会预取数量过多。
flatMap第一个参数会返回一个流,事例代码中限制从上游取5个生成5个子流,只有前一个流完成后才会向上游再申请数据,所以不存在对上游的预取。
flatMap的第三个参数prefetch值表示是从第一个参数的返回的子流中预取的数量,而不是从flatMap的上游预取的数量,如果返回的Mono本身不会产生大量的数据,不写也可以。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static void main(String[] args) { Flux.range(0, 10000) .buffer(1) .flatMap(list -> Mono.fromRunnable(() -> { System.out.println("处理数据:" + list.size()); try { Thread.sleep(5000); } catch (InterruptedException e) { } }).publishOn(Schedulers.boundedElastic()) , 5 , 1) .blockLast(); }
|