Skip to content

Explore leveraging Virtual Thread Coroutine dispatcher #33788

Open
@MarcinMoskala

Description

@MarcinMoskala

Currently, the default dispatcher used for suspending controller functions is Dispatchers.Unconfined, which is a dangerous poor choice. I believe it was chosen due to the common misconception that Dispatchers.Unconfined runs on the thread that was used to start it. That is true, but only until the first suspension point, after that it runs on the thread that was used to resume it, what is dangerous, because libraries are designed to use the minimal number of threads in their suspending API, and they do not design what thread is used to resume, as they assume a dispatcher will change it anyway (out of all dispatchers, only Dispatchers.Unconfined is not changing it).

Take a look at this example from my book Kotlin Coroutines: Deep Dive:

fun main() {
    var continuation: Continuation<Unit>? = null

    thread(name = "Thread1") {
        CoroutineScope(Dispatchers.Unconfined).launch {
            println(Thread.currentThread().name) // Thread1

            suspendCancellableCoroutine {
                continuation = it
            }

            println(Thread.currentThread().name) // Thread2
            
            delay(1000)
            
            println(Thread.currentThread().name) // kotlinx.coroutines.DefaultExecutor
        }
    }

    thread(name = "Thread2") {
        Thread.sleep(1000)
        continuation?.resume(Unit)
    }
    
    Thread.sleep(3000)
}

As you can see, after suspension, the coroutine runs on the thread that resumed it, and after delay it runs on DefaultExecutor. This poor thread is only supposed to be used to schedule coroutines resuming, not to run their bodies. Above all, it is one for the whole application.

Consider this simplified controller:

@RestController
@RequestMapping
class PingController(){
    @GetMapping("/ping")
    suspend fun ping(): ResponseEntity<Map<String, Boolean>> {
        delay(1000)
        Thread.sleep(1000)
        return ResponseEntity(mapOf("success" to true), HttpStatus.OK)
    }
}

If you make 1000 requests, it should take at least 1001 seconds, as all sleeps will happen on DefaultExecutor (my experiments confirm that). That is no good. If we used Dispatchers.IO, it would need 1000 / 64 + 1 = 17 seconds (due to IO limit). Of course, in a real-life example we should have some db or network request instead of delay, and some processing instead of sleep, but the essential problem remains the same.

@RestController
@RequestMapping
class PingController(){
    @GetMapping("/ping")
    suspend fun ping(): ResponseEntity<Map<String, Boolean>> {
        val data = fetchData()
        complexProcessing(data)
        return ResponseEntity(mapOf("success" to true), HttpStatus.OK)
    }
}

Most suspending network clients optimize to use a minimal number of threads. In Ktor Client, for instance, most engines will use only one thread to resume coroutines, so delay is actually mimicking that pretty well. Consider the following example. On my computer, it takes 30 seconds with Dispatchers.Unconfined, but only 5 seconds if we used Dispatchers.Default instead:

suspend fun main() = measureTimeMillis { 
    withContext(Dispatchers.Unconfined) {
        repeat(1000) {
            launch {
                val data = fetchData()
                complexProcessing(data)
            }
        }
    }
}.let { println("Took $it") }

suspend fun fetchData(): Data {
    delay(1000)
    return Data()
}
class Data()

private val list = List(200_000) { it }.shuffled()
fun complexProcessing(data: Data) {
    list.map { it % 10_000 }.sorted()
}

So what dispatcher should be used? In theory, if we never block threads, Dispatchers.Default is the best option, but using it would be a wishful thinking. There are too many blocking APIs on backend, and Dispatchers.Default is not good if you have blocking calls. Dispatchers.IO is what is used by Ktor Server, and it would be a better option. Though it is not perfect, as it has one global limit of 64 threads. The danger is that Dispatchers.IO is used to wrap over blocking calls, and if one process needs to do a lot of blocking calls (consider a job that sends newsletter using blocking SendGrid API), then controller handlers might wait in queue for an available thread.

I believe the perfect option would be to use:

  • LOOM dispatcher if available and configured (Executors.newVirtualThreadPerTaskExecutor().asCoroutineDispatcher())
  • A dispatcher with an independent limit otherwise (it can be limited to 64 as well, but the point it so have an independent limit from Dispatchers.IO. In the current version on Kotlin Coroutines we create it with Dispatchers.IO.limitedParallelism(50), and in older versions the best we could do it making a dispatcher from a fixed pool of threads with (Executors.newFixedThreadPool(50).asCoroutineDispatcher()).

Metadata

Metadata

Assignees

Labels

in: webIssues in web modules (web, webmvc, webflux, websocket)theme: kotlinAn issue related to Kotlin supporttype: enhancementA general enhancement

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions