reactor中的线程池

Posted by hcy on January 17, 2022

reactor中的线程池

1.DelegateServiceScheduler

将任务委托给底层单个线程池,和直接使用线程池无本质区别。

2.SingleScheduler

单线程的scheduler,和java自带的一致。 ​

3.ImmediateScheduler

在主线程执行的scheduler。

4.ParallelScheduler

此scheduler会将任务均匀分配给内部的单线程线程池执行,核心逻辑十分简单,就是均匀的分配,不对任务的执行时间进行区分。 运气不好时会将耗时长的任务放置到同一个线程内,导致该线程执行时间过长,所以理论上用来执行非堵塞的任务比较好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    ScheduledExecutorService pick() {
        ScheduledExecutorService[] a = executors;
        if (a == null) {
            start();
            a = executors;
            if (a == null) {
                throw new IllegalStateException("executors uninitialized after implicit start()");
            }
        }
        if (a != SHUTDOWN) {
            // ignoring the race condition here, its already random who gets which executor
            int idx = roundRobin;
            if (idx == n) {
                idx = 0;
                roundRobin = 1;
            } else {
                roundRobin = idx + 1;
            }
            return a[idx];
        }
        return TERMINATED;
    }

5.BoundedElasticScheduler

类似于ParallelScheduler,底层也是若干个单线程线程池,但是在任务分配上做了优化。底层会判断哪个线程池当前任务量最少,优先将任务分配给任务量最少的线程池。 判断哪个线程池当前任务量最少使用的是优先级队列,每次获取队列头的线程池。并且当子线程池没有任务时,将进入idleQueue中,一段时间不使用会进行回收。 适合执行堵塞的任务,不宜执行大量非堵塞的任务,因为非堵塞任务执行很快,使用下面复杂的pick过程反而更耗时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
BoundedState pick() {
			for (;;) {
				int a = get();
				if (a == DISPOSED) {
					return CREATING; //synonym for shutdown, since the underlying executor is shut down
				}

				if (!idleQueue.isEmpty()) {
					//try to find an idle resource
					BoundedState bs = idleQueue.pollLast();
					if (bs != null && bs.markPicked()) {
						busyQueue.add(bs);
						return bs;
					}
					//else optimistically retry (implicit continue here)
				}
				else if (a < parent.maxThreads) {
					//try to build a new resource
					if (compareAndSet(a, a + 1)) {
						ScheduledExecutorService s = Schedulers.decorateExecutorService(parent, parent.createBoundedExecutorService());
						BoundedState newState = new BoundedState(this, s);
						if (newState.markPicked()) {
							busyQueue.add(newState);
							return newState;
						}
					}
					//else optimistically retry (implicit continue here)
				}
				else {
					//pick the least busy one
					BoundedState s = busyQueue.poll();
					if (s != null && s.markPicked()) {
						busyQueue.add(s); //put it back in the queue with updated priority
						return s;
					}
					//else optimistically retry (implicit continue here)
				}
			}
		}

还有一点需要注意的是,Schedulers.newBoundedElastic() 的第二个参数queuedTaskCap 表示底层的每个子线程池的最大容量,而不是BoundedElasticScheduler本身的最大容量。

SingleScheduler和ParallelScheduler是不允许执行堵塞任务的,这一点在reactor内部如执行block()操作符时就会报错。我们将堵塞任务放入这两个线程池内执行虽然不会报错,但也是很不规范的,可能会导致内部操作变慢。


转载请注明出处:https://www.huangchaoyu.com/2022/01/17/reactor中的线程池/