Skip to content

Commit

Permalink
Introduce broadcast coroutine builder;
Browse files Browse the repository at this point in the history
support `BroadcastChannel.cancel` method to drop the buffer;
Introduce ReceiveChannel.broadcast() extension.

Fixes #280
  • Loading branch information
elizarov committed May 16, 2018
1 parent c8af35c commit bd08693
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import kotlinx.coroutines.experimental.selects.*
* Sender suspends only when buffer is full due to one of the receives being slow to consume and
* receiver suspends only when buffer is empty.
*
* Note, that elements that are sent to the broadcast channel while there are no [openSubscription] subscribers are immediately
* lost.
* **Note**, that elements that are sent to this channel while there are no
* [openSubscription] subscribers are immediately lost.
*
* This channel is created by `BroadcastChannel(capacity)` factory function invocation.
*
Expand Down Expand Up @@ -68,17 +68,22 @@ class ArrayBroadcastChannel<E>(
override val isBufferAlwaysFull: Boolean get() = false
override val isBufferFull: Boolean get() = size >= capacity

override fun openSubscription(): ReceiveChannel<E> =
public override fun openSubscription(): ReceiveChannel<E> =
Subscriber(this).also {
updateHead(addSub = it)
}

override fun close(cause: Throwable?): Boolean {
public override fun close(cause: Throwable?): Boolean {
if (!super.close(cause)) return false
checkSubOffers()
return true
}

public override fun cancel(cause: Throwable?): Boolean =
close(cause).also {
for (sub in subs) sub.cancel(cause)
}

// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
override fun offerInternal(element: E): Any {
bufferLock.withLock {
Expand Down Expand Up @@ -210,8 +215,15 @@ class ArrayBroadcastChannel<E>(
override fun cancel(cause: Throwable?): Boolean =
close(cause).also { closed ->
if (closed) broadcastChannel.updateHead(removeSub = this)
clearBuffer()
}

private fun clearBuffer() {
subLock.withLock {
subHead = broadcastChannel.tail
}
}

// returns true if subHead was updated and broadcast channel's head must be checked
// this method is lock-free (it never waits on lock)
@Suppress("UNCHECKED_CAST")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.experimental.intrinsics.*
import kotlin.coroutines.experimental.*

/**
* Broadcasts all elements of the channel.
*
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
) : BroadcastChannel<E> =
broadcast(Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
for (e in this@broadcast) {
send(e)
}
}

/**
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
* and returns a reference to the coroutine as a [BroadcastChannel]. This resulting
* object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
*
* The scope of the coroutine contains [ProducerScope] interface, which implements
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
*
* The [context] for the new coroutine can be explicitly specified.
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [coroutineContext] of the parent coroutine may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* The parent job may be also explicitly specified using [parent] parameter.
*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* * when `capacity` positive (1 by default), but less than [UNLIMITED] -- uses [ArrayBroadcastChannel]
* * when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
* * otherwise -- throws [IllegalArgumentException].
*
* **Note:** By default, the coroutine does not start until the first subscriber appears via [BroadcastChannel.openSubscription]
* as [start] parameter has a value of [CoroutineStart.LAZY] by default.
* This ensures that the first subscriber does not miss any sent elements.
* However, later subscribers may miss elements.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
public fun <E> broadcast(
context: CoroutineContext = DefaultDispatcher,
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY,
parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E> {
val channel = BroadcastChannel<E>(capacity)
val newContext = newCoroutineContext(context, parent)
val coroutine = if (start.isLazy)
LazyBroadcastCoroutine(newContext, channel, block) else
BroadcastCoroutine(newContext, channel, active = true)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(start, coroutine, block)
return coroutine
}

private open class BroadcastCoroutine<E>(
parentContext: CoroutineContext,
protected val _channel: BroadcastChannel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
override val channel: SendChannel<E>
get() = this

@Suppress("RETURN_TYPE_MISMATCH_ON_OVERRIDE")
public override fun openSubscription(): ReceiveChannel<E> = _channel.openSubscription()

public override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)

override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
val cause = exceptionally?.cause
val processed = when (exceptionally) {
is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
else -> _channel.close(cause) // producer coroutine has completed -- close channel
}
if (!processed && cause != null)
handleCoroutineException(context, cause)
}
}

private class LazyBroadcastCoroutine<E>(
parentContext: CoroutineContext,
channel: BroadcastChannel<E>,
private val block: suspend ProducerScope<E>.() -> Unit
) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
override fun openSubscription(): ReceiveChannel<E> {
// open subscription _first_
val subscription = _channel.openSubscription()
// then start coroutine
start()
return subscription
}

override fun onStart() {
block.startCoroutineCancellable(this, this)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,25 @@ public interface BroadcastChannel<E> : SendChannel<E> {
@Deprecated(message = "Renamed to `openSubscription`",
replaceWith = ReplaceWith("openSubscription()"))
public fun open(): SubscriptionReceiveChannel<E> = openSubscription() as SubscriptionReceiveChannel<E>

/**
* Cancels reception of remaining elements from this channel. This function closes the channel with
* the specified cause (unless it was already closed), removes all buffered sent elements from it,
* and [cancels][ReceiveChannel.cancel] all open subscriptions.
* This function returns `true` if the channel was not closed previously, or `false` otherwise.
*
* A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
* receive on a failed channel throw the specified [cause] exception.
*/
public fun cancel(cause: Throwable? = null): Boolean
}

/**
* Creates a broadcast channel with the specified buffer capacity.
*
* The resulting channel type depends on the specified [capacity] parameter:
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel];
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel]
* **Note:** this channel looses all items that are send to it until the first subscriber appears;
* * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
* * otherwise -- throws [IllegalArgumentException].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.experimental.selects.SelectClause1
import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.select
import kotlinx.coroutines.experimental.yield
import kotlinx.coroutines.experimental.selects.*

/**
* Sender's interface to [Channel].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}
}

override val isClosedForSend: Boolean get() = _state.value is Closed
override val isFull: Boolean get() = false
public override val isClosedForSend: Boolean get() = _state.value is Closed
public override val isFull: Boolean get() = false

@Suppress("UNCHECKED_CAST")
override fun openSubscription(): ReceiveChannel<E> {
public override fun openSubscription(): ReceiveChannel<E> {
val subscriber = Subscriber<E>(this)
_state.loop { state ->
when (state) {
Expand Down Expand Up @@ -150,7 +150,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}

private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
if (list == null) return Array<Subscriber<E>>(1) { subscriber }
if (list == null) return Array(1) { subscriber }
return list + subscriber
}

Expand All @@ -167,7 +167,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}

@Suppress("UNCHECKED_CAST")
override fun close(cause: Throwable?): Boolean {
public override fun close(cause: Throwable?): Boolean {
_state.loop { state ->
when (state) {
is Closed -> return false
Expand All @@ -183,12 +183,17 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}
}

/**
* Closes this broadcast channel. Same as [close].
*/
public override fun cancel(cause: Throwable?): Boolean = close(cause)

/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
* future subscribers. This implementation never suspends.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
suspend override fun send(element: E) {
public override suspend fun send(element: E) {
offerInternal(element)?.let { throw it.sendException }
}

Expand All @@ -197,7 +202,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
* future subscribers. This implementation always returns `true`.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
override fun offer(element: E): Boolean {
public override fun offer(element: E): Boolean {
offerInternal(element)?.let { throw it.sendException }
return true
}
Expand Down Expand Up @@ -229,7 +234,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}
}

override val onSend: SelectClause2<E, SendChannel<E>>
public override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
registerSelectSend(select, param, block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import kotlin.coroutines.experimental.*

/**
Expand Down Expand Up @@ -65,6 +66,13 @@ interface ProducerJob<out E> : ReceiveChannel<E>, Job {
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* * when `capacity` is 0 (default) -- uses [RendezvousChannel] without a buffer;
* * when `capacity` is [Channel.UNLIMITED] -- uses [LinkedListChannel] with buffer of unlimited size;
* * when `capacity` is [Channel.CONFLATED] -- uses [ConflatedChannel] that conflates back-to-back sends;
* * when `capacity` is positive, but less than [UNLIMITED] -- uses [ArrayChannel] with a buffer of the specified `capacity`;
* * otherwise -- throws [IllegalArgumentException].
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,17 @@ class ArrayBroadcastChannelTest : TestBase() {
subscription.receiveOrNull()
}

@Test
fun testReceiveNoneAfterCancel() = runTest {
val channel = BroadcastChannel<Int>(10)
val sub = channel.openSubscription()
// generate into buffer & cancel
for (x in 1..5) channel.send(x)
channel.cancel()
assertTrue(channel.isClosedForSend)
assertTrue(sub.isClosedForReceive)
check(sub.receiveOrNull() == null)
}

private class TestException : Exception()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*
import kotlin.test.*

class BroadcastTest : TestBase() {
@Test
fun testBroadcastBasic() = runTest {
expect(1)
val b = broadcast(coroutineContext) {
expect(4)
send(1) // goes to receiver
expect(5)
send(2) // goes to buffer
expect(6)
send(3) // suspends, will not be consumes, but will not be cancelled either
expect(10)
}
yield() // has no effect, because default is lazy
expect(2)
b.consume {
expect(3)
assertEquals(1, receive()) // suspends
expect(7)
assertEquals(2, receive()) // suspends
expect(8)
}
expect(9)
yield() // to broadcast
finish(11)
}
}
Loading

0 comments on commit bd08693

Please sign in to comment.