Skip to content

ElasticScheduler: improve cache eviction #1700

Closed
@dittos

Description

Currently ElasticScheduler uses a queue to manage the cache of executors. As far as I understand,

  • When a task is finished, the free executor gets enqueued to the queue with ttl.
  • When a new task is scheduled, a cached executor is dequeued from the queue (which has the oldest ttl).

Therefore an executor with the highest chance of getting evicted is removed from the queue for reuse,
which leads to the problem that executors are unable to get evicted if tasks are scheduled at some rate faster than ttlSeconds.

The following code demonstrates the problem:

// Monitor thread count
thread(start = true) {
    val baseThreads = Thread.activeCount()
    var t = 0
    while (true) {
        println("$t: ${Thread.activeCount() - baseThreads} threads")
        t++
        Thread.sleep(1000)
    }
}

// ElasticScheduler with ttlSeconds = 1
val scheduler = Schedulers.newElastic("elastic", 1)

// Cache executors
for (i in 1..100) {
    Mono.fromCallable { Thread.sleep(100) }
        .subscribeOn(scheduler)
        .subscribe()
}

// Schedule tasks faster than ttlSeconds
while (true) {
    Mono.just(1)
        .subscribeOn(scheduler)
        .subscribe()
    Thread.sleep(10)
}
0: 0 threads
1: 102 threads
2: 102 threads
3: 90 threads
4: 90 threads
5: 88 threads
6: 88 threads
7: 87 threads
8: 87 threads
9: 86 threads
10: 86 threads
11: 86 threads
12: 86 threads
13: 86 threads
14: 84 threads
15: 84 threads
16: 84 threads
17: 84 threads
18: 84 threads
19: 84 threads
20: 84 threads
...

If I change the queue to deque and use LIFO order:

0: 0 threads
1: 102 threads
2: 102 threads
3: 2 threads
4: 2 threads
5: 2 threads
6: 2 threads
7: 2 threads
8: 2 threads
9: 2 threads
10: 2 threads
...

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions