Skip to content

Commit

Permalink
Reactor coroutine context propagation in more places
Browse files Browse the repository at this point in the history
* Propagation of the coroutine context of await calls into
  Mono/Flux builder
* Publisher.asFlow propagates coroutine context from `collect`
  call to the Publisher
* Flow.asFlux transform

Fixes #284
  • Loading branch information
SokolovaMaria authored and elizarov committed Jul 30, 2019
1 parent 1156e1c commit 4c069fc
Show file tree
Hide file tree
Showing 22 changed files with 347 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,29 @@ public final class kotlinx/coroutines/reactive/ChannelKt {
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
}

public abstract interface class kotlinx/coroutines/reactive/ContextInjector {
public abstract fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/ConvertKt {
public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/FlowKt {
public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/FlowSubscription : org/reactivestreams/Subscription {
public final field flow Lkotlinx/coroutines/flow/Flow;
public final field subscriber Lorg/reactivestreams/Subscriber;
public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V
public fun cancel ()V
public fun request (J)V
}

public final class kotlinx/coroutines/reactive/PublishKt {
public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
Expand All @@ -44,12 +62,3 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ public final class kotlinx/coroutines/reactor/ConvertKt {
public static final fun asMono (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Mono;
}

public final class kotlinx/coroutines/reactor/FlowKt {
public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux;
}

public final class kotlinx/coroutines/reactor/FluxKt {
public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
Expand All @@ -28,6 +32,11 @@ public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines
public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
}

public final class kotlinx/coroutines/reactor/ReactorContextInjector : kotlinx/coroutines/reactive/ContextInjector {
public fun <init> ()V
public fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactor/ReactorContextKt {
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
}
Expand Down
13 changes: 12 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import java.util.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -81,6 +82,16 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)

// ------------------------ private ------------------------

// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
private val contextInjectors: Array<ContextInjector> =
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList().toTypedArray()

private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
contextInjectors.fold(this) { pub, contextInjector ->
contextInjector.injectCoroutineContext(pub, coroutineContext)
}

private enum class Mode(val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
Expand All @@ -93,7 +104,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
subscribe(object : Subscriber<T> {
injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
private lateinit var subscription: Subscription
private var value: T? = null
private var seenValue = false
Expand Down
14 changes: 14 additions & 0 deletions reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kotlinx.coroutines.reactive

import kotlinx.coroutines.InternalCoroutinesApi
import org.reactivestreams.Publisher
import kotlin.coroutines.CoroutineContext

/** @suppress */
@InternalCoroutinesApi
public interface ContextInjector {
/**
* Injects the coroutine context into the context of the publisher.
*/
public fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
}
109 changes: 109 additions & 0 deletions reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.reactive

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.reactivestreams.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*

/**
* Transforms the given flow to a spec-compliant [Publisher].
*/
@ExperimentalCoroutinesApi
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)

/**
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
* [cancel] invocation cancels the original flow.
*/
@Suppress("PublisherImplementation")
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
override fun subscribe(subscriber: Subscriber<in T>?) {
if (subscriber == null) throw NullPointerException()
subscriber.onSubscribe(FlowSubscription(flow, subscriber))
}
}

/** @suppress */
@InternalCoroutinesApi
public class FlowSubscription<T>(
@JvmField val flow: Flow<T>,
@JvmField val subscriber: Subscriber<in T>
) : Subscription {
@Volatile
private var canceled: Boolean = false
private val requested = AtomicLong(0L)
private val producer: AtomicReference<CancellableContinuation<Unit>?> = AtomicReference()

// This is actually optimizable
private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
try {
consumeFlow()
subscriber.onComplete()
} catch (e: Throwable) {
// Failed with real exception, not due to cancellation
if (!coroutineContext[Job]!!.isCancelled) {
subscriber.onError(e)
}
}
}

private suspend fun consumeFlow() {
flow.collect { value ->
if (!coroutineContext.isActive) {
subscriber.onComplete()
coroutineContext.ensureActive()
}

if (requested.get() == 0L) {
suspendCancellableCoroutine<Unit> {
producer.set(it)
if (requested.get() != 0L) it.resumeSafely()
}
}

requested.decrementAndGet()
subscriber.onNext(value)
}
}

override fun cancel() {
canceled = true
job.cancel()
}

override fun request(n: Long) {
if (n <= 0) {
return
}

if (canceled) return

job.start()
var snapshot: Long
var newValue: Long
do {
snapshot = requested.get()
newValue = snapshot + n
if (newValue <= 0L) newValue = Long.MAX_VALUE
} while (!requested.compareAndSet(snapshot, newValue))

val prev = producer.get()
if (prev == null || !producer.compareAndSet(prev, null)) return
prev.resumeSafely()
}

private fun CancellableContinuation<Unit>.resumeSafely() {
val token = tryResume(Unit)
if (token != null) {
completeResume(token)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.reactive.flow
@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.reactive

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import java.util.*
import kotlin.coroutines.*

/**
Expand All @@ -21,13 +24,11 @@ import kotlin.coroutines.*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
* are discarded.
*/
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this, 1)

@FlowPreview
@JvmName("from")
@Deprecated(
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
level = DeprecationLevel.ERROR,
Expand Down Expand Up @@ -70,7 +71,7 @@ private class PublisherAsFlow<T : Any>(

override suspend fun collect(collector: FlowCollector<T>) {
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
publisher.subscribe(subscriber)
publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber)
try {
var consumed = 0L
while (true) {
Expand Down Expand Up @@ -127,3 +128,11 @@ private class ReactiveSubscriber<T : Any>(
subscription.cancel()
}
}

// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
private val contextInjectors: List<ContextInjector> =
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList()

private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
103 changes: 0 additions & 103 deletions reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

@file:Suppress("UNCHECKED_CAST")

package kotlinx.coroutines.reactive.flow
package kotlinx.coroutines.reactive

import kotlinx.coroutines.flow.*
import org.junit.*
Expand Down
Loading

0 comments on commit 4c069fc

Please sign in to comment.