reactor中的线程池

reactor中的线程池

1.DelegateServiceScheduler

将任务委托给底层单个线程池,和直接使用线程池无本质区别。

2.SingleScheduler

单线程的scheduler,和java自带的一致。

3.ImmediateScheduler

在主线程执行的scheduler。

4.ParallelScheduler

此scheduler会将任务均匀分配给内部的单线程线程池执行,核心逻辑十分简单,就是均匀的分配,不对任务的执行时间进行区分。
运气不好时会将耗时长的任务放置到同一个线程内,导致该线程执行时间过长,所以理论上用来执行非堵塞的任务比较好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ScheduledExecutorService pick() {
ScheduledExecutorService[] a = executors;
if (a == null) {
start();
a = executors;
if (a == null) {
throw new IllegalStateException("executors uninitialized after implicit start()");
}
}
if (a != SHUTDOWN) {
// ignoring the race condition here, its already random who gets which executor
int idx = roundRobin;
if (idx == n) {
idx = 0;
roundRobin = 1;
} else {
roundRobin = idx + 1;
}
return a[idx];
}
return TERMINATED;
}

5.BoundedElasticScheduler

类似于ParallelScheduler,底层也是若干个单线程线程池,但是在任务分配上做了优化。底层会判断哪个线程池当前任务量最少,优先将任务分配给任务量最少的线程池。
判断哪个线程池当前任务量最少使用的是优先级队列,每次获取队列头的线程池。并且当子线程池没有任务时,将进入idleQueue中,一段时间不使用会进行回收。
适合执行堵塞的任务,不宜执行大量非堵塞的任务,因为非堵塞任务执行很快,使用下面复杂的pick过程反而更耗时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
BoundedState pick() {
for (;;) {
int a = get();
if (a == DISPOSED) {
return CREATING; //synonym for shutdown, since the underlying executor is shut down
}

if (!idleQueue.isEmpty()) {
//try to find an idle resource
BoundedState bs = idleQueue.pollLast();
if (bs != null && bs.markPicked()) {
busyQueue.add(bs);
return bs;
}
//else optimistically retry (implicit continue here)
}
else if (a < parent.maxThreads) {
//try to build a new resource
if (compareAndSet(a, a + 1)) {
ScheduledExecutorService s = Schedulers.decorateExecutorService(parent, parent.createBoundedExecutorService());
BoundedState newState = new BoundedState(this, s);
if (newState.markPicked()) {
busyQueue.add(newState);
return newState;
}
}
//else optimistically retry (implicit continue here)
}
else {
//pick the least busy one
BoundedState s = busyQueue.poll();
if (s != null && s.markPicked()) {
busyQueue.add(s); //put it back in the queue with updated priority
return s;
}
//else optimistically retry (implicit continue here)
}
}
}

还有一点需要注意的是,Schedulers.newBoundedElastic() 的第二个参数queuedTaskCap 表示底层的每个子线程池的最大容量,而不是BoundedElasticScheduler本身的最大容量。

SingleScheduler和ParallelScheduler是不允许执行堵塞任务的,这一点在reactor内部如执行block()操作符时就会报错。我们将堵塞任务放入这两个线程池内执行虽然不会报错,但也是很不规范的,可能会导致内部操作变慢。


reactor中的线程池
https://www.huangchaoyu.com/933416019.html/
作者
hcy
发布于
2022年1月17日
更新于
2024年8月17日
许可协议