Closed
Description
Currently ElasticScheduler
uses a queue to manage the cache of executors. As far as I understand,
- When a task is finished, the free executor gets enqueued to the queue with ttl.
- When a new task is scheduled, a cached executor is dequeued from the queue (which has the oldest ttl).
Therefore an executor with the highest chance of getting evicted is removed from the queue for reuse,
which leads to the problem that executors are unable to get evicted if tasks are scheduled at some rate faster than ttlSeconds
.
The following code demonstrates the problem:
// Monitor thread count
thread(start = true) {
val baseThreads = Thread.activeCount()
var t = 0
while (true) {
println("$t: ${Thread.activeCount() - baseThreads} threads")
t++
Thread.sleep(1000)
}
}
// ElasticScheduler with ttlSeconds = 1
val scheduler = Schedulers.newElastic("elastic", 1)
// Cache executors
for (i in 1..100) {
Mono.fromCallable { Thread.sleep(100) }
.subscribeOn(scheduler)
.subscribe()
}
// Schedule tasks faster than ttlSeconds
while (true) {
Mono.just(1)
.subscribeOn(scheduler)
.subscribe()
Thread.sleep(10)
}
0: 0 threads
1: 102 threads
2: 102 threads
3: 90 threads
4: 90 threads
5: 88 threads
6: 88 threads
7: 87 threads
8: 87 threads
9: 86 threads
10: 86 threads
11: 86 threads
12: 86 threads
13: 86 threads
14: 84 threads
15: 84 threads
16: 84 threads
17: 84 threads
18: 84 threads
19: 84 threads
20: 84 threads
...
If I change the queue to deque and use LIFO order:
0: 0 threads
1: 102 threads
2: 102 threads
3: 2 threads
4: 2 threads
5: 2 threads
6: 2 threads
7: 2 threads
8: 2 threads
9: 2 threads
10: 2 threads
...