Description
Currently, the default dispatcher used for suspending controller functions is Dispatchers.Unconfined
, which is a dangerous poor choice. I believe it was chosen due to the common misconception that Dispatchers.Unconfined
runs on the thread that was used to start it. That is true, but only until the first suspension point, after that it runs on the thread that was used to resume it, what is dangerous, because libraries are designed to use the minimal number of threads in their suspending API, and they do not design what thread is used to resume, as they assume a dispatcher will change it anyway (out of all dispatchers, only Dispatchers.Unconfined
is not changing it).
Take a look at this example from my book Kotlin Coroutines: Deep Dive:
fun main() {
var continuation: Continuation<Unit>? = null
thread(name = "Thread1") {
CoroutineScope(Dispatchers.Unconfined).launch {
println(Thread.currentThread().name) // Thread1
suspendCancellableCoroutine {
continuation = it
}
println(Thread.currentThread().name) // Thread2
delay(1000)
println(Thread.currentThread().name) // kotlinx.coroutines.DefaultExecutor
}
}
thread(name = "Thread2") {
Thread.sleep(1000)
continuation?.resume(Unit)
}
Thread.sleep(3000)
}
As you can see, after suspension, the coroutine runs on the thread that resumed it, and after delay
it runs on DefaultExecutor. This poor thread is only supposed to be used to schedule coroutines resuming, not to run their bodies. Above all, it is one for the whole application.
Consider this simplified controller:
@RestController
@RequestMapping
class PingController(){
@GetMapping("/ping")
suspend fun ping(): ResponseEntity<Map<String, Boolean>> {
delay(1000)
Thread.sleep(1000)
return ResponseEntity(mapOf("success" to true), HttpStatus.OK)
}
}
If you make 1000 requests, it should take at least 1001 seconds, as all sleeps will happen on DefaultExecutor (my experiments confirm that). That is no good. If we used Dispatchers.IO
, it would need 1000 / 64 + 1 = 17 seconds (due to IO
limit). Of course, in a real-life example we should have some db or network request instead of delay
, and some processing instead of sleep
, but the essential problem remains the same.
@RestController
@RequestMapping
class PingController(){
@GetMapping("/ping")
suspend fun ping(): ResponseEntity<Map<String, Boolean>> {
val data = fetchData()
complexProcessing(data)
return ResponseEntity(mapOf("success" to true), HttpStatus.OK)
}
}
Most suspending network clients optimize to use a minimal number of threads. In Ktor Client, for instance, most engines will use only one thread to resume coroutines, so delay is actually mimicking that pretty well. Consider the following example. On my computer, it takes 30 seconds with Dispatchers.Unconfined
, but only 5 seconds if we used Dispatchers.Default
instead:
suspend fun main() = measureTimeMillis {
withContext(Dispatchers.Unconfined) {
repeat(1000) {
launch {
val data = fetchData()
complexProcessing(data)
}
}
}
}.let { println("Took $it") }
suspend fun fetchData(): Data {
delay(1000)
return Data()
}
class Data()
private val list = List(200_000) { it }.shuffled()
fun complexProcessing(data: Data) {
list.map { it % 10_000 }.sorted()
}
So what dispatcher should be used? In theory, if we never block threads, Dispatchers.Default
is the best option, but using it would be a wishful thinking. There are too many blocking APIs on backend, and Dispatchers.Default
is not good if you have blocking calls. Dispatchers.IO
is what is used by Ktor Server, and it would be a better option. Though it is not perfect, as it has one global limit of 64 threads. The danger is that Dispatchers.IO
is used to wrap over blocking calls, and if one process needs to do a lot of blocking calls (consider a job that sends newsletter using blocking SendGrid API), then controller handlers might wait in queue for an available thread.
I believe the perfect option would be to use:
- LOOM dispatcher if available and configured (
Executors.newVirtualThreadPerTaskExecutor().asCoroutineDispatcher()
) - A dispatcher with an independent limit otherwise (it can be limited to 64 as well, but the point it so have an independent limit from
Dispatchers.IO
. In the current version on Kotlin Coroutines we create it withDispatchers.IO.limitedParallelism(50)
, and in older versions the best we could do it making a dispatcher from a fixed pool of threads with (Executors.newFixedThreadPool(50).asCoroutineDispatcher()
).