Skip to content

Commit

Permalink
Introduce broadcast coroutine builder;
Browse files Browse the repository at this point in the history
support `BroadcastChannel.cancel` method to drop the buffer;
Introduce ReceiveChannel.broadcast() extension.

Fixes #280
  • Loading branch information
elizarov committed May 17, 2018
1 parent c8af35c commit e1358bc
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ public abstract interface class kotlinx/coroutines/experimental/channels/ActorSc

public final class kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/BroadcastChannel {
public fun <init> (I)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun close (Ljava/lang/Throwable;)Z
public final fun getCapacity ()I
public fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
Expand All @@ -564,12 +565,14 @@ public class kotlinx/coroutines/experimental/channels/ArrayChannel : kotlinx/cor

public abstract interface class kotlinx/coroutines/experimental/channels/BroadcastChannel : kotlinx/coroutines/experimental/channels/SendChannel {
public static final field Factory Lkotlinx/coroutines/experimental/channels/BroadcastChannel$Factory;
public abstract fun cancel (Ljava/lang/Throwable;)Z
public abstract fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
public abstract fun openSubscription ()Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
public abstract synthetic fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
}

public final class kotlinx/coroutines/experimental/channels/BroadcastChannel$DefaultImpls {
public static synthetic fun cancel$default (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static fun open (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
public static synthetic fun openSubscription (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
}
Expand All @@ -582,6 +585,13 @@ public final class kotlinx/coroutines/experimental/channels/BroadcastChannelKt {
public static final fun BroadcastChannel (I)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
}

public final class kotlinx/coroutines/experimental/channels/BroadcastKt {
public static final fun broadcast (Lkotlin/coroutines/experimental/CoroutineContext;ILkotlinx/coroutines/experimental/CoroutineStart;Lkotlinx/coroutines/experimental/Job;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
public static final fun broadcast (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;ILkotlinx/coroutines/experimental/CoroutineStart;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
public static synthetic fun broadcast$default (Lkotlin/coroutines/experimental/CoroutineContext;ILkotlinx/coroutines/experimental/CoroutineStart;Lkotlinx/coroutines/experimental/Job;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
public static synthetic fun broadcast$default (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;ILkotlinx/coroutines/experimental/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
}

public abstract interface class kotlinx/coroutines/experimental/channels/Channel : kotlinx/coroutines/experimental/channels/ReceiveChannel, kotlinx/coroutines/experimental/channels/SendChannel {
public static final field CONFLATED I
public static final field Factory Lkotlinx/coroutines/experimental/channels/Channel$Factory;
Expand Down Expand Up @@ -715,6 +725,7 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh
public static final field UNDEFINED Lkotlinx/coroutines/experimental/internal/Symbol;
public fun <init> ()V
public fun <init> (Ljava/lang/Object;)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun close (Ljava/lang/Throwable;)Z
public fun getOnSend ()Lkotlinx/coroutines/experimental/selects/SelectClause2;
public final fun getValue ()Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import kotlinx.coroutines.experimental.selects.*
* Sender suspends only when buffer is full due to one of the receives being slow to consume and
* receiver suspends only when buffer is empty.
*
* Note, that elements that are sent to the broadcast channel while there are no [openSubscription] subscribers are immediately
* lost.
* **Note**, that elements that are sent to this channel while there are no
* [openSubscription] subscribers are immediately lost.
*
* This channel is created by `BroadcastChannel(capacity)` factory function invocation.
*
Expand Down Expand Up @@ -68,17 +68,22 @@ class ArrayBroadcastChannel<E>(
override val isBufferAlwaysFull: Boolean get() = false
override val isBufferFull: Boolean get() = size >= capacity

override fun openSubscription(): ReceiveChannel<E> =
public override fun openSubscription(): ReceiveChannel<E> =
Subscriber(this).also {
updateHead(addSub = it)
}

override fun close(cause: Throwable?): Boolean {
public override fun close(cause: Throwable?): Boolean {
if (!super.close(cause)) return false
checkSubOffers()
return true
}

public override fun cancel(cause: Throwable?): Boolean =
close(cause).also {
for (sub in subs) sub.cancel(cause)
}

// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
override fun offerInternal(element: E): Any {
bufferLock.withLock {
Expand Down Expand Up @@ -210,8 +215,15 @@ class ArrayBroadcastChannel<E>(
override fun cancel(cause: Throwable?): Boolean =
close(cause).also { closed ->
if (closed) broadcastChannel.updateHead(removeSub = this)
clearBuffer()
}

private fun clearBuffer() {
subLock.withLock {
subHead = broadcastChannel.tail
}
}

// returns true if subHead was updated and broadcast channel's head must be checked
// this method is lock-free (it never waits on lock)
@Suppress("UNCHECKED_CAST")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.experimental.intrinsics.*
import kotlin.coroutines.experimental.*

/**
* Broadcasts all elements of the channel.
*
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
) : BroadcastChannel<E> =
broadcast(Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
for (e in this@broadcast) {
send(e)
}
}

/**
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
* and returns a reference to the coroutine as a [BroadcastChannel]. This resulting
* object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
*
* The scope of the coroutine contains [ProducerScope] interface, which implements
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
*
* The [context] for the new coroutine can be explicitly specified.
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [coroutineContext] of the parent coroutine may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* The parent job may be also explicitly specified using [parent] parameter.
*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* * when `capacity` positive (1 by default), but less than [UNLIMITED] -- uses [ArrayBroadcastChannel]
* * when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
* * otherwise -- throws [IllegalArgumentException].
*
* **Note:** By default, the coroutine does not start until the first subscriber appears via [BroadcastChannel.openSubscription]
* as [start] parameter has a value of [CoroutineStart.LAZY] by default.
* This ensures that the first subscriber does not miss any sent elements.
* However, later subscribers may miss elements.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
public fun <E> broadcast(
context: CoroutineContext = DefaultDispatcher,
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY,
parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E> {
val channel = BroadcastChannel<E>(capacity)
val newContext = newCoroutineContext(context, parent)
val coroutine = if (start.isLazy)
LazyBroadcastCoroutine(newContext, channel, block) else
BroadcastCoroutine(newContext, channel, active = true)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(start, coroutine, block)
return coroutine
}

private open class BroadcastCoroutine<E>(
parentContext: CoroutineContext,
protected val _channel: BroadcastChannel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
override val channel: SendChannel<E>
get() = this

@Suppress("RETURN_TYPE_MISMATCH_ON_OVERRIDE")
public override fun openSubscription(): ReceiveChannel<E> = _channel.openSubscription()

public override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)

override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
val cause = exceptionally?.cause
val processed = when (exceptionally) {
is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
else -> _channel.close(cause) // producer coroutine has completed -- close channel
}
if (!processed && cause != null)
handleCoroutineException(context, cause)
}
}

private class LazyBroadcastCoroutine<E>(
parentContext: CoroutineContext,
channel: BroadcastChannel<E>,
private val block: suspend ProducerScope<E>.() -> Unit
) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
override fun openSubscription(): ReceiveChannel<E> {
// open subscription _first_
val subscription = _channel.openSubscription()
// then start coroutine
start()
return subscription
}

override fun onStart() {
block.startCoroutineCancellable(this, this)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,25 @@ public interface BroadcastChannel<E> : SendChannel<E> {
@Deprecated(message = "Renamed to `openSubscription`",
replaceWith = ReplaceWith("openSubscription()"))
public fun open(): SubscriptionReceiveChannel<E> = openSubscription() as SubscriptionReceiveChannel<E>

/**
* Cancels reception of remaining elements from this channel. This function closes the channel with
* the specified cause (unless it was already closed), removes all buffered sent elements from it,
* and [cancels][ReceiveChannel.cancel] all open subscriptions.
* This function returns `true` if the channel was not closed previously, or `false` otherwise.
*
* A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
* receive on a failed channel throw the specified [cause] exception.
*/
public fun cancel(cause: Throwable? = null): Boolean
}

/**
* Creates a broadcast channel with the specified buffer capacity.
*
* The resulting channel type depends on the specified [capacity] parameter:
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel];
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel]
* **Note:** this channel looses all items that are send to it until the first subscriber appears;
* * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
* * otherwise -- throws [IllegalArgumentException].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.experimental.selects.SelectClause1
import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.select
import kotlinx.coroutines.experimental.yield
import kotlinx.coroutines.experimental.selects.*

/**
* Sender's interface to [Channel].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}
}

override val isClosedForSend: Boolean get() = _state.value is Closed
override val isFull: Boolean get() = false
public override val isClosedForSend: Boolean get() = _state.value is Closed
public override val isFull: Boolean get() = false

@Suppress("UNCHECKED_CAST")
override fun openSubscription(): ReceiveChannel<E> {
public override fun openSubscription(): ReceiveChannel<E> {
val subscriber = Subscriber<E>(this)
_state.loop { state ->
when (state) {
Expand Down Expand Up @@ -150,7 +150,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}

private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
if (list == null) return Array<Subscriber<E>>(1) { subscriber }
if (list == null) return Array(1) { subscriber }
return list + subscriber
}

Expand All @@ -167,7 +167,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}

@Suppress("UNCHECKED_CAST")
override fun close(cause: Throwable?): Boolean {
public override fun close(cause: Throwable?): Boolean {
_state.loop { state ->
when (state) {
is Closed -> return false
Expand All @@ -183,12 +183,17 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}
}

/**
* Closes this broadcast channel. Same as [close].
*/
public override fun cancel(cause: Throwable?): Boolean = close(cause)

/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
* future subscribers. This implementation never suspends.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
suspend override fun send(element: E) {
public override suspend fun send(element: E) {
offerInternal(element)?.let { throw it.sendException }
}

Expand All @@ -197,7 +202,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
* future subscribers. This implementation always returns `true`.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
override fun offer(element: E): Boolean {
public override fun offer(element: E): Boolean {
offerInternal(element)?.let { throw it.sendException }
return true
}
Expand Down Expand Up @@ -229,7 +234,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}
}

override val onSend: SelectClause2<E, SendChannel<E>>
public override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
registerSelectSend(select, param, block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import kotlin.coroutines.experimental.*

/**
Expand Down Expand Up @@ -65,6 +66,13 @@ interface ProducerJob<out E> : ReceiveChannel<E>, Job {
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* * when `capacity` is 0 (default) -- uses [RendezvousChannel] without a buffer;
* * when `capacity` is [Channel.UNLIMITED] -- uses [LinkedListChannel] with buffer of unlimited size;
* * when `capacity` is [Channel.CONFLATED] -- uses [ConflatedChannel] that conflates back-to-back sends;
* * when `capacity` is positive, but less than [UNLIMITED] -- uses [ArrayChannel] with a buffer of the specified `capacity`;
* * otherwise -- throws [IllegalArgumentException].
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,17 @@ class ArrayBroadcastChannelTest : TestBase() {
subscription.receiveOrNull()
}

@Test
fun testReceiveNoneAfterCancel() = runTest {
val channel = BroadcastChannel<Int>(10)
val sub = channel.openSubscription()
// generate into buffer & cancel
for (x in 1..5) channel.send(x)
channel.cancel()
assertTrue(channel.isClosedForSend)
assertTrue(sub.isClosedForReceive)
check(sub.receiveOrNull() == null)
}

private class TestException : Exception()
}
Loading

0 comments on commit e1358bc

Please sign in to comment.