Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliceable dispatchers: Provide alternative to newSingle/FixedThreadPoolContext via a shared pool of threads #261

Closed
elizarov opened this issue Feb 27, 2018 · 22 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Feb 27, 2018

Background

newFixedThreadPoolContext is actively used in coroutines code as a concurrency-limiting mechanism. For example, to limit a number of concurrent request to the database to 10 one typically defines:

val DB = newFixedThreadPoolContext(10, "DB")

and then wraps all DB invocation into withContext(DB) { ... } blocks.

This approach have the following problems:

  • This withContext(DB) invocation performs an actual switch to a different thread which is extremely expensive.
  • The result of newFixedThreadPoolContext references the underlying threads and must be explicitly closed when no longer used. This is quite error-prone as programmers may use newFixedThreadPoolContext in their code without realizing this fact, thus leaking threads.

Solution

The plan is to reimplement newFixedThreadPoolContext from scratch so that it does not create any threads. Instead, there will be one shared pool of threads that creates new thread strictly when they are needed. Thus, newFixedThreadPoolContext does not create its own threads, but acts only as a semaphore that limits the number of concurrent operations running in this context.

Moreover, DefaultContext, which is currently equal to CommonPool (backed by ForkJointPool.commonPool), is going to be redefined in this way:

val DefaultContext = newFixedThreadPoolContext(defaultParallelism, "DefaultContext")

The current plan is to set defaultParallelism to nCPUs + 1 as a compromise value that ensures utilization of the underlying hardware even if one coroutine accidentally blocks and helps us avoid issue #198

Now, with this redefinition of DefaultContext the code that is used to define its own DB context continues to work as before (limiting the number of concurrent DB operations). However, both issues identified above are solved:

  • This withContext(DB) invocation does not actually perform thread context switch anymore. It only switches coroutine context and separately keeps track of and limits the number of concurrently running coroutines in DB context.
  • There is not need to close newFixedThreadPoolContext anymore, as it is not backed by any physical threads, no risk of leaking threads.

This change also affects newSingleThreadContext as its implementation is:

fun newSingleThreadContext(name: String) = newFixedThreadPoolContext(1, name)

This might break some code (feedback is welcome!) as there could have been some code in the wild that assumed that everything working in newSingleThreadContext was indeed happening in the single instance of Thread and used ThreadLocal for to store something, for example. The workaround for this code is to use Executors.newSingleThreadExecutor().toCoroutineDispatcher().

This issue is related to the discussion on IO dispatcher in #79. It is inefficient to use Executors.newCachedThreadPool().toCoroutineContext() due to the thread context switches. The plan, as a part of this issue, is to define the following constant:

val IO: CoroutineContext = ...

The name is to be discussed in #79

Coroutines working in this context share the same thread pool as DefaultContext, so there is no cost of thread switch when doing withContext(IO) { ... }, but there is no inherent limit on the number of such concurrently executed operations.

Note, that we also avoid issue #216 with this rewrite.

Open questions

  • Shall we rename newFixedThreadPoolContext and newSingleThreadContext after this rewrite or leave their names as is? Can we name it better?

  • Should we leave newSingleThreadContext defined as before (with all the context switch cost) to avoid potentially breaking existing code? This would work especially well if newFixedThreadPoolContext is somehow renamed (old is deprecated), but newSingleThreadContext retains the old name.

UPDATE: Due to backward-compatibility requirements the actual design will likely be different. Stay tuned.

@fvasco
Copy link
Contributor

fvasco commented Feb 27, 2018

newFixedThreadPoolContext is actively used in coroutines code as a concurrency-limiting mechanism

Why not define a JVM-like Semaphore?

fun Mutex(permits: Int = 1) : Mutex

It is also possible consider to introduce:

fun Mutex.asCoroutineDispatcher(delegated : CoroutineDispatcher = DefaultDispatcher): CoroutineDispatcher

The current plan is to set defaultParallelism to nCPUs + 1

Usually the GC pressure is enough to cover accidentally blocks.
Setting defaultParallelism=max(nCPUs, 2) should work same.

I consider a different question to create a flexible thread pool and mixing blocking and nonblocking operations in it, are you considering to benchmark a prototype?
A context-switch is costly, I agree, but this cost is lesser then an I/O operation.

Coroutines working in this context share the same common pool (so no cost of thread switch when doing withContext(IO) { ... })

ForkJoinPool uses a task queue for each thread, withContext(IO) doesn't force any context switch but locks the thread, so all other tasks are forced to switch on another thread.
Switch a task on another thread on multiprocessor (NUMA) server requires to refresh the local CPU cache (L1, L2), this is the greatest cost of the context switch.

@elizarov
Copy link
Contributor Author

elizarov commented Feb 27, 2018

@fvasco Unfortunately, the Mutex interface (with suspending lock) is not an appropriate abstraction to efficiently implement a coroutine dispatcher on top of it. The "semaphore" has to be an internal data structure tightly coupled with the implementation of the corresponding dispatcher.

The appropriate value of defaultParallelism is questionable, but I personally like nCPUs+1 more than max(nCPUs, 2), because the former is more regular, too.

The questions of a flexible thread-pool and blocking IO are indeed different, but it looks that they can be solved with a single implementation effort. We'll definitely start implementation with benchmarks and experiment with different strategies.

The idea is that similarly to FJP, this implementation is going to be "sticky" and will only move coroutines to another thread when absolutely necessarily. We plan to be way more lazy in this respect than FJP (it can be shown that FJP work-stealing strategy actually has adverse performance impact on a typical CSP-style code).

With respect to blocking operations it means that we'll give some time for a blocking operations to complete before moving all other coroutines to another thread. It seems to be the most efficient strategy based on our study of other languages and libraries, but we'll see how it actually works out in practice.

@fvasco
Copy link
Contributor

fvasco commented Feb 27, 2018

Hi @elizarov,
thank you for quick response, I will stay tuned for updates.

Regarding Semaphore like a dispatcher I suspected your consideration too late, sorry for this miss.

However I consider a valid option to use a Mutex to limit the concurrent access to a resource, more appropriate than using a custom CoroutineDispatcher.

@fvasco
Copy link
Contributor

fvasco commented Mar 5, 2018

Should we leave newSingleThreadContext defined as before (with all the context switch cost) to avoid potentially breaking existing code?

I say yes, may be something like:

fun newDedicatedThreadDispatcher(
        threadPoolSize: Int = 1,
        threadFactory: ThreadFactory? = Executors.defaultThreadFactory()
): CoroutineDispatcher

elizarov added a commit that referenced this issue Oct 6, 2018
* The will be replaced by another mechanism in the future.
  See #261 for details.
* The proposed replacement is to use the standard java API:
  Executors.newSingleThreadExecutor/newFixedThreadPool
  and convert to dispatcher via asCoroutineDispatcher() extension.
@voddan
Copy link

voddan commented Oct 6, 2018

Shall we rename newFixedThreadPoolContext and newSingleThreadContext after this rewrite or leave their names as is? Can we name it better?

I believe we shall, because we can. Something like Dispatchers.fixedPool is more inline with the current naming.

Moreover, I don't think we need a separate factory method to create a single threaded pool. It seems like an obvious case of a fixed-size pool, even if some optimization is happening under the covers. Using Dispatchers.fixedPool(size = 1) should cover all the use cases.

@voddan
Copy link

voddan commented Oct 6, 2018

What should I do if I need a grantee that my thread pool is never blocked (for more than 50ms)? For example I use a coroutine as a timer:

launch {
    delay(1000)
    println("step 1")

    delay(1000)
    println("step 2")

    delay(1000)
    println("step 3")
}

This code fails if for some reason all the threads in the pool are blocked (do CPU-bound work) and the scheduler is unable to switch this coroutine to a thread on time. Will the new pool be able to battle that?

@qwwdfsad qwwdfsad added the design label Oct 8, 2018
qwwdfsad pushed a commit that referenced this issue Oct 8, 2018
* The will be replaced by another mechanism in the future.
  See #261 for details.
* The proposed replacement is to use the standard java API:
  Executors.newSingleThreadExecutor/newFixedThreadPool
  and convert to dispatcher via asCoroutineDispatcher() extension.
@fvasco
Copy link
Contributor

fvasco commented Apr 28, 2019

Regarding my consideration above, now is available the issue #1088.

@justjake
Copy link

Hi, I saw the deprecation notice of https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-single-thread-context.html

Is there a recommended, current way to execute coroutines with FIFO semantics? Specifically I need to sequence access of SQLite database to occur from a single thread.

@fvasco
Copy link
Contributor

fvasco commented Feb 22, 2020

@justjake

Is there a recommended, current way to execute coroutines with FIFO semantics?

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html

Specifically I need to sequence access of SQLite database to occur from a single thread.

You should not use the coroutine.

@justjake
Copy link

@fvasco thank you for your reply. However I am now more confused: why is coroutines not the right tool here? My thought was that callers that require SQLite could await sub-operations dispatched to the SQLite dispatcher that uses only a single thread. Currently in my application this uses a concrete SQLiteJob with callbacks pushed into a queue, a Java ThreadPool, etc, which is cumbersome and already needs a suspend / resume to be used from coroutines.

@fvasco
Copy link
Contributor

fvasco commented Feb 23, 2020

why is coroutines not the right tool here?

It is possible to run multiple coroutine concurrently on a single thread, but you want to "sequence access of SQLite database to occur from a single thread".

which is cumbersome and already needs a suspend / resume to be used from coroutines

Put the logic in a function, example:

suspend fun <T> sqlContext(block: (Sqlite) -> T) : T =
  executor.submit{ block(sqlite) }.await()

@matejdro
Copy link

Any reason why would this executor be better than mutex?

@elizarov
Copy link
Contributor Author

The reason is that it does a different thing than mutex:

  • With mutex (or semaphore) you can limit concurrency. That is, you can limit how many concurrent operations of a certain type your code is doing.
  • With this limited dispatcher you can limit parallelism. That is, you can limit how many things your code does in parallel. This is all about CPU-resource allocation. You can limit how many CPU cores/threads this or that kind of code could consume or block.

@sschuberth
Copy link

* There is not need to close `newFixedThreadPoolContext` anymore, as it is not backed by any physical threads, no risk of leaking threads.

I'm really looking forward to this @elizarov. Is there any ETA / milestone for that feature?

@elizarov
Copy link
Contributor Author

elizarov commented Apr 2, 2020

Sorry, I cannot give any specific ETA at the moment.

@alamothe
Copy link

alamothe commented Jun 5, 2020

Related: https://discuss.kotlinlang.org/t/coroutine-dispatcher-confined-to-a-single-thread/17978

I would need newSingleThreadContext implemented exactly in this way, to pick a thread from the default thread pool.

Is there anything I could use in the meantime?

@elizarov
Copy link
Contributor Author

elizarov commented Jun 9, 2020

@alamothe In the forum thread you ask:

Is there such thing as a dispatcher which “picks one worker thread from a default pool and does not jump between threads”?

This is not what this proposal is supposed to do. In this proposal even with a limit of 1 thread it will still jump between threads, but it will ensure that at most 1 thread is used at any time. We don't have anything out-of-the box to cover your needs. You'll have to write your own dispatcher that supports it.

@alamothe
Copy link

but it will ensure that at most 1 thread is used at any time.

This is actually what I need. The requirement is to prevent unintentional multi-threading (because our code is not thread-safe). My original post didn't state this in the best possible way, but later I arrived at this conclusion.

Thanks for checking!

@Rosomack
Copy link

Rosomack commented Sep 2, 2020

Is there any plan to support this for Kotlin/Native?

@elizarov elizarov changed the title Reimplement DefaultContext and newSingle/FixedThreadPoolContext via a shared pool of threads Sliceable dispatchers: Provide alternative to newSingle/FixedThreadPoolContext via a shared pool of threads Oct 14, 2020
@elizarov
Copy link
Contributor Author

UPDATE: Due to backward-compatibility requirements the actual design will likely be different. Stay tuned.

@KotlinGeekDev
Copy link

Hello. Please, are there any updates on the design?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Sep 6, 2021

Closing this in the favour of #2919

Let's continue our discussion there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants