Skip to content

Commit

Permalink
Implemented a rough sketch of CoroutineStartInterceptor.
Browse files Browse the repository at this point in the history
The implementation is only for the JVM runtime.

To make entrance from a blocked thread work as expected for `ThreadContextElements`,
this draft chooses to modify `runBlocking {}` s.t. its contract is that the
`CoroutineContext` stack parameter is always used for interception.
  • Loading branch information
yorickhenning committed Jan 28, 2022
1 parent ab44090 commit 343d478
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 26 deletions.
15 changes: 13 additions & 2 deletions kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,19 @@ package kotlinx.coroutines
import kotlin.coroutines.*

/**
* Creates a context for the new coroutine. It installs [Dispatchers.Default] when no other dispatcher or
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on).
* Creates a new [CoroutineContext] for a new coroutine constructed in [this] [CoroutineScope].
*
* When a [CoroutineStartInterceptor] is present in [this] [CoroutineScope],]
* [newCoroutineContext] calls it to construct the new context.
*
* Otherwise, it uses `this.coroutineContext + context` to create the new context.
*
* Before returning the new context, [newCoroutineContext] adds [Dispatchers.Default] to if no
* [ContinuationInterceptor] was present in either [this] scope's [CoroutineContext] or in
* [context].
*
* [newCoroutineContext] also adds debugging facilities to the returned context when debug features
* are enabled.
*/
public expect fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext

Expand Down
89 changes: 89 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineStartInterceptor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.*

/**
* Called to construct the `CoroutineContext` for a new coroutine.
*
* When a `CoroutineStartInterceptor` is present in a parent coroutine’s
* `CoroutineContext`, Coroutines will call it to construct a new child coroutine’s
* `CoroutineContext`.
*
* The `CoroutineStartInterceptor` can insert, remove, copy, or modify
* `CoroutineContext.Elements` passed to the new child coroutine.
*
* The “default implementation” of `interceptContext()` used by coroutine builders
* ([coroutineScope], [launch], [async]) when no [CoroutineStartInterceptor] is included, is
* `callingContext + addedContext`. This default folds the the coroutine builder’s `context`
* parameter left onto the parent coroutine's `coroutineContext`.
*
* This API is delicate and performance sensitive.
*
* Since `interceptContext()` is called each time a new coroutine is created, its
* implementation has a disproportionate impact on coroutine performance.
*
* Since `interceptContext` _replaces_ coroutine context inheritance, it can arbitrarily change how
* coroutines inherit their scope. In order for a child coroutine’s `CoroutineContext`
* to be inherited as described in documentation, an override of `interceptContext()`
* __must__ add `callingContext` and `addedContext` to form the return value, with
* `callingContext` to the left of `addedContext` in the sum.
*
* These example statements all preserve "normal" inheritance and modify a custom element:
*
* ```
* callingContext + addedContext
* callingContext + CustomContextElement() + addedContext
* callingContext + addedContext + CustomContextElement()
* ```
*
* These examples _break `Job` inheritance_, because they drop or reverse `callingContext` folding:
*
* ```
* addedContext + callingContext
* CustomContextElement() + addedContext
* ```
*/
@ExperimentalCoroutinesApi
@DelicateCoroutinesApi
public interface CoroutineStartInterceptor : CoroutineContext.Element {

public companion object Key : CoroutineContext.Key<CoroutineStartInterceptor>

/**
* Called to construct the `CoroutineContext` for a new coroutine.
*
* The `CoroutineContext` returned by `interceptContext()` will be the `CoroutineContext` used
* by the child coroutine.
*
* [callingContext] is the `CoroutineContext` of the coroutine constructing the coroutine. If
* the coroutine is getting constructed by `runBlocking {}` outside of a running coroutine,
* [callingContext] will be the `EmptyCoroutineContext`.
*
* [addedContext] is the `CoroutineContext` passed as a parameter to the coroutine builder. If
* no `CoroutineContext` was passed, [addedContext will be the `EmptyCoroutineContext`.
*
* Consider this example:
*
* ```
* runBlocking(CustomCoroutineStartInterceptor()) {
* async(CustomContextElement()) {
* }
* }
* ```
*
* In this arrangement, `CustomCoroutineStartInterceptor.interceptContext()` is called
* to construct the `CoroutineContext` for the `async` coroutine. When
* `interceptContext()` is called, `callingContext` will contain
* `CustomCoroutineStartInterceptor`. `addedContext` will contain the new
* `CustomContextElement`.
*/
public fun interceptContext(
callingContext: CoroutineContext,
addedContext: CoroutineContext
): CoroutineContext
}

55 changes: 39 additions & 16 deletions kotlinx-coroutines-core/jvm/src/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,44 @@

package kotlinx.coroutines

import java.util.concurrent.locks.*
import kotlin.contracts.*
import kotlin.coroutines.*

/**
* Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
* This function should not be used from a coroutine. It is designed to bridge regular blocking code
* to libraries that are written in suspending style, to be used in `main` functions and in tests.
* Creates a new coroutine and **blocks** the current thread _interruptibly_ to immediately execute
* it.
*
* The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
* in this blocked thread until the completion of this coroutine.
* See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
* [runBlocking] allows regular blocking code to call libraries that are written using coroutines.
* [runBlocking] should be called by a test case or in a program's `main()` to "boostrap" into
* coroutines.
*
* [runBlocking] should never be called from _in_ a coroutine. Blocking a thread is unnecessary
* and inefficient. When a function is a `suspend` function, call [coroutineScope] rather than
* [runBlocking] in order to introduce parallelism.
*
* [runBlocking] uses its input [context] as though it were the [CoroutineContext] that
* constructed the blocking coroutine. Unlike a child coroutine built with [launch] or [async],
* a [runBlocking] coroutine gets its [CoroutineStartInterceptor] and [ContinuationInterceptor]
* from its parameter, rather than from the running context.
*
* When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
* the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
* then this invocation uses the outer event loop.
*
* If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
* this `runBlocking` invocation throws [InterruptedException].
* If [context] does not contain a [CoroutineDispatcher], [runBlocking] will include add an event
* loop [CoroutineDispatcher] to the [CoroutineContext], and execute continuations using the
* blocked thread until [block] returns.
*
* When a [CoroutineStartInterceptor] is explicitly specified in the [context], it intercepts the
* construction of _this_ coroutine. This is a special case that allows the thread calling
* `runBlocking` to intercept the coroutine start before it blocks the thread.
*
* If the blocked thread is interrupted (see [Thread.interrupt]), this coroutine's job will be
* cancelled. If the cancellation by interrupt succeeds, the running [runBlocking] function call
* will complete by throwing [InterruptedException].
*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
* for a newly created coroutine.
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
* facilities that are available for a newly created coroutine.
*
* @param context the context of the coroutine. The default value is an event loop on the current thread.
* @param block the coroutine code.
Expand All @@ -40,19 +56,26 @@ public actual fun <T> runBlocking(context: CoroutineContext, block: suspend Coro
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val continuationInterceptor = context[ContinuationInterceptor]
val coroutineStartInterceptor = context[CoroutineStartInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
if (continuationInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
newContext = newCoroutineContext(
callingContext = context + eventLoop,
addedContext = EmptyCoroutineContext
)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
eventLoop = (continuationInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
newContext = newCoroutineContext(
callingContext = context,
addedContext = EmptyCoroutineContext
)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
Expand Down
50 changes: 42 additions & 8 deletions kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,51 @@ import kotlin.coroutines.*
import kotlin.coroutines.jvm.internal.CoroutineStackFrame

/**
* Creates context for the new coroutine. It installs [Dispatchers.Default] when no other dispatcher nor
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on).
* Creates a new [CoroutineContext] for a new coroutine constructed in [this] [CoroutineScope].
*
* See [DEBUG_PROPERTY_NAME] for description of debugging facilities on JVM.
* When a [CoroutineStartInterceptor] is present in [this] [CoroutineScope],]
* [newCoroutineContext] calls it to construct the new context.
*
* Otherwise, it uses `this.coroutineContext + context` to create the new context.
*
* Before returning the new context, [newCoroutineContext] adds [Dispatchers.Default] to if no
* [ContinuationInterceptor] was present in either [this] scope's [CoroutineContext] or in
* [context].
*
* [newCoroutineContext] also adds debugging facilities to the returned context when debug features
* are enabled.
*/
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext =
newCoroutineContext(coroutineContext, context)

/**
* An overload of [CoroutineScope.newCoroutineContext] that accepts both the calling context and
* the context overlay as plain function parameters. This saves using `GlobalScope` or allocating
* a new anonymous `CoroutineScope` when `runBlocking {}` constructs a coroutine.
*/
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext.foldCopiesForChildCoroutine() + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
public fun newCoroutineContext(
callingContext: CoroutineContext,
addedContext: CoroutineContext
): CoroutineContext {
val interceptor = callingContext[CoroutineStartInterceptor]

val childContext: CoroutineContext =
if (interceptor != null) {
interceptor.interceptContext(
callingContext = callingContext.foldCopiesForChildCoroutine(),
addedContext = addedContext
)
} else {
// Default context inheritance: fold left.
callingContext.foldCopiesForChildCoroutine() + addedContext
}

val withDebug = if (DEBUG) childContext + CoroutineId(COROUTINE_ID.incrementAndGet()) else childContext

return if (childContext !== Dispatchers.Default && childContext[ContinuationInterceptor] == null)
withDebug + Dispatchers.Default else withDebug
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import org.junit.Test
import kotlin.coroutines.*
import kotlin.test.*

class CoroutineStartInterceptorSimpleTest: TestBase() {

/**
* A [CoroutineContext.Element] holding an integer, used to enumerate elements allocated
* during context construction.
*/
private class IntegerContextElement(
val number: Int
): CoroutineContext.Element {
public companion object Key : CoroutineContext.Key<IntegerContextElement>
override val key = Key
}

/**
* Inserts a new, unique [IntegerContextElement] into each new coroutine context constructed,
* overriding any present.
*/
class AddElementInterceptor : CoroutineStartInterceptor,
AbstractCoroutineContextElement(CoroutineStartInterceptor.Key) {
private var integerSource = 0

override fun interceptContext(
callingContext: CoroutineContext,
addedContext: CoroutineContext
): CoroutineContext {
integerSource += 1
return callingContext + addedContext + IntegerContextElement(integerSource)
}
}

@Test
fun testContextInterceptorOverridesContextElement() = runTest {
assertNull(coroutineContext[IntegerContextElement.Key])

runBlocking(AddElementInterceptor()) {

}
}

@Test
fun testAsyncDoesNotInterceptFromAddedContext() = runTest {
assertNull(coroutineContext[IntegerContextElement.Key])

async(AddElementInterceptor()) {
assertNull(coroutineContext[IntegerContextElement.Key])
}.join()
}

@Test
fun testLaunchDoesNotInterceptFromAddedContext() = runTest {
assertNull(coroutineContext[IntegerContextElement.Key])

launch(AddElementInterceptor()) {
assertNull(coroutineContext[IntegerContextElement.Key])
}.join()
}

@Test
fun testRunBlockingInterceptsFromAddedContext() = runTest {
assertNull(coroutineContext[IntegerContextElement.Key])

runBlocking(AddElementInterceptor()) {
assertEquals(
1,
coroutineContext[IntegerContextElement.Key]!!.number
)
}
}

@Test
fun testChildCoroutineContextInterceptedFromCallingContext() = runTest {
assertNull(coroutineContext[IntegerContextElement.Key])

launch(AddElementInterceptor()) {
launch {
assertEquals(
1,
coroutineContext[IntegerContextElement.Key]!!.number
)
}.join()
launch {
assertEquals(
2,
coroutineContext[IntegerContextElement.Key]!!.number
)
}.join()
}
}

@Test
fun testChildCoroutineContextInterceptedFromCallingContextNotAddedContext() = runTest {
assertNull(coroutineContext[IntegerContextElement.Key])

launch(AddElementInterceptor()) {
launch {}.join()

launch(AddElementInterceptor()) {// Count of this interceptor is 0.
assertEquals(
2,
coroutineContext[IntegerContextElement.Key]!!.number
)
launch {
assertEquals(
1, // Parent coroutine's context intercepted.
coroutineContext[IntegerContextElement.Key]!!.number
)
}.join()
}.join()
}
}
}

0 comments on commit 343d478

Please sign in to comment.