Skip to content

Commit

Permalink
Introduce IO dispatcher to offload blocking I/O-intensive tasks
Browse files Browse the repository at this point in the history
Fixes #79
  • Loading branch information
elizarov committed Aug 23, 2018
1 parent 4b0379f commit 2efbd3a
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public final class kotlinx/coroutines/experimental/CoroutineContextKt {
public static final field DEBUG_PROPERTY_VALUE_AUTO Ljava/lang/String;
public static final field DEBUG_PROPERTY_VALUE_OFF Ljava/lang/String;
public static final field DEBUG_PROPERTY_VALUE_ON Ljava/lang/String;
public static final field IO_PARALLELISM_PROPERTY_NAME Ljava/lang/String;
public static final fun getDefaultDispatcher ()Lkotlinx/coroutines/experimental/CoroutineDispatcher;
public static final fun getIO ()Lkotlinx/coroutines/experimental/CoroutineDispatcher;
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;)Lkotlin/coroutines/experimental/CoroutineContext;
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;)Lkotlin/coroutines/experimental/CoroutineContext;
public static synthetic fun newCoroutineContext$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;ILjava/lang/Object;)Lkotlin/coroutines/experimental/CoroutineContext;
Expand Down
17 changes: 10 additions & 7 deletions common/kotlinx-coroutines-core-common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ import kotlin.coroutines.experimental.*
* Base class that shall be extended by all coroutine dispatcher implementations.
*
* The following standard implementations are provided by `kotlinx.coroutines`:
*
* * [DefaultDispatcher] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
* is specified in their context. It is currently equal to [CommonPool] (subject to change in the future).
* This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
* * [CommonPool] -- schedules coroutine execution to a common pool of shared background threads designed
* to be used for compute-intensive code.
* * [IO] -- uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_
* operations (like file I/O and blocking socket I/O).
* * [Unconfined] -- starts coroutine execution in the current call-frame until the first suspension.
* On first suspension the coroutine builder function returns.
* The coroutine will resume in whatever thread that is used by the
* The coroutine resumes in whatever thread that is used by the
* corresponding suspending function, without confining it to any specific thread or pool.
* This in an appropriate choice for IO-intensive coroutines that do not consume CPU resources.
* * [DefaultDispatcher] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
* is specified in their context. It is currently equal to [CommonPool] (subject to change).
* * [CommonPool] -- immediately returns from the coroutine builder and schedules coroutine execution to
* a common pool of shared background threads.
* This is an appropriate choice for compute-intensive coroutines that consume a lot of CPU resources.
* **Unconfined dispatcher should not be normally used in code**.
* * Private thread pools can be created with [newSingleThreadContext] and [newFixedThreadPoolContext].
* * An arbitrary [Executor][java.util.concurrent.Executor] can be converted to dispatcher with [asCoroutineDispatcher] extension function.
*
Expand Down
26 changes: 23 additions & 3 deletions core/kotlinx-coroutines-core/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.*
import kotlin.coroutines.experimental.*

/**
* Name of the property that control coroutine debugging. See [newCoroutineContext].
* Name of the property that controls coroutine debugging. See [newCoroutineContext].
*/
public const val DEBUG_PROPERTY_NAME = "kotlinx.coroutines.debug"

Expand Down Expand Up @@ -56,14 +56,34 @@ internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_N
}

/**
* This is the default [CoroutineDispatcher] that is used by all standard builders like
* The default [CoroutineDispatcher] that is used by all standard builders like
* [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is currently equal to [CommonPool], but the value is subject to change in the future.
* You can set system property "`kotlinx.coroutines.scheduler`" (either no value or to the value of "`on`")
* to use an experimental coroutine dispatcher that shares threads with [IO] dispatcher and thus can switch to
* [IO] context without performing an actual thread context switch.
*/
@Suppress("PropertyName")
public actual val DefaultDispatcher: CoroutineDispatcher =
if (useCoroutinesScheduler) ExperimentalCoroutineDispatcher() else CommonPool
if (useCoroutinesScheduler) BackgroundDispatcher else CommonPool

/**
* Name of the property that defines the maximal number of threads that are used by [IO] coroutines dispatcher.
*/
public const val IO_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.io.parallelism"

/**
* The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
*
* Additional threads in this pool are created and are shutdown on demand.
* The number of threads used by this dispatcher is limited by the value of
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
*/
public val IO by lazy {
BackgroundDispatcher.blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
}

/**
* Creates context for the new coroutine. It installs [DefaultDispatcher] when no other dispatcher nor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ import kotlinx.coroutines.experimental.*
import java.util.concurrent.*
import kotlin.coroutines.experimental.*

/**
* Default instance of coroutine dispatcher for background coroutines (as opposed to UI coroutines).
*/
internal object BackgroundDispatcher : ExperimentalCoroutineDispatcher()

/**
* @suppress **This is unstable API and it is subject to change.**
*/
// TODO make internal (and rename) after complete integration
class ExperimentalCoroutineDispatcher(
open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long
Expand Down
23 changes: 23 additions & 0 deletions core/kotlinx-coroutines-core/test/IODispatcherTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

import kotlinx.coroutines.experimental.*
import org.junit.Test
import kotlin.test.*

class IODispatcherTest : TestBase() {
@Test
fun testWithIOContext() = runTest {
// just a very basic test that is dispatcher works and indeed uses background thread
val mainThread = Thread.currentThread()
expect(1)
withContext(IO) {
expect(2)
assertNotSame(mainThread, Thread.currentThread())
}
expect(3)
assertSame(mainThread, Thread.currentThread())
finish(4)
}
}
4 changes: 2 additions & 2 deletions core/kotlinx-coroutines-core/test/TestBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ public actual open class TestBase actual constructor() {

fun initPoolsBeforeTest() {
CommonPool.usePrivatePool()
if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).usePrivateScheduler()
BackgroundDispatcher.usePrivateScheduler()
}

fun shutdownPoolsAfterTest() {
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).shutdown(SHUTDOWN_TIMEOUT)
BackgroundDispatcher.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
}

Expand Down

0 comments on commit 2efbd3a

Please sign in to comment.