Skip to content

Commit

Permalink
Introduce trySend and tryReceive channel operations as a future repla…
Browse files Browse the repository at this point in the history
…cement for error-prone offer, poll and receiveOrNull

Fixes #974
  • Loading branch information
qwwdfsad committed Mar 25, 2021
1 parent ed8e8d1 commit a67fd8e
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 57 deletions.
21 changes: 20 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx

public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V
public static fun poll (Lkotlinx/coroutines/channels/ActorScope;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel {
Expand All @@ -566,6 +567,7 @@ public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : k
public final class kotlinx/coroutines/channels/BroadcastChannel$DefaultImpls {
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static fun offer (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Object;)Z
}

public final class kotlinx/coroutines/channels/BroadcastChannelKt {
Expand Down Expand Up @@ -598,6 +600,8 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co

public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V
public static fun offer (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Object;)Z
public static fun poll (Lkotlinx/coroutines/channels/Channel;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/Channel$Factory {
Expand Down Expand Up @@ -628,7 +632,7 @@ public final class kotlinx/coroutines/channels/ChannelKt {
public final class kotlinx/coroutines/channels/ChannelResult {
public static final field Companion Lkotlinx/coroutines/channels/ChannelResult$Companion;
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ChannelResult;
public static synthetic fun constructor-impl (Ljava/lang/Object;Lkotlin/jvm/internal/DefaultConstructorMarker;)Ljava/lang/Object;
public static fun constructor-impl (Ljava/lang/Object;)Ljava/lang/Object;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
Expand All @@ -645,6 +649,12 @@ public final class kotlinx/coroutines/channels/ChannelResult {
public final synthetic fun unbox-impl ()Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/ChannelResult$Companion {
public final fun closed-JP2dKIU (Ljava/lang/Throwable;)Ljava/lang/Object;
public final fun failure-PtdJZtk ()Ljava/lang/Object;
public final fun success-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/ChannelsKt {
public static final fun all (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun any (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -789,6 +799,7 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
public fun offer (Ljava/lang/Object;)Z
public fun openSubscription ()Lkotlinx/coroutines/channels/ReceiveChannel;
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/ProduceKt {
Expand All @@ -804,6 +815,10 @@ public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotl
public abstract fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
}

public final class kotlinx/coroutines/channels/ProducerScope$DefaultImpls {
public static fun offer (Lkotlinx/coroutines/channels/ProducerScope;Ljava/lang/Object;)Z
}

public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract synthetic fun cancel ()V
public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z
Expand All @@ -818,12 +833,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveCatching-JP2dKIU (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun tryReceive-PtdJZtk ()Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static fun poll (Lkotlinx/coroutines/channels/ReceiveChannel;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/channels/SendChannel {
Expand All @@ -834,10 +851,12 @@ public abstract interface class kotlinx/coroutines/channels/SendChannel {
public abstract fun isFull ()Z
public abstract fun offer (Ljava/lang/Object;)Z
public abstract fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/SendChannel$DefaultImpls {
public static synthetic fun close$default (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static fun offer (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Object;)Z
}

public final class kotlinx/coroutines/channels/TickerChannelsKt {
Expand Down
43 changes: 32 additions & 11 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,42 @@ internal abstract class AbstractSendChannel<E>(
return sendSuspend(element)
}

public final override fun offer(element: E): Boolean {
override fun offer(element: E): Boolean {
try {
return super.offer(element)
} catch (e: Throwable) {
onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
// If it crashes, add send exception as suppressed for better diagnostics
it.addSuppressed(e)
throw it
}
throw e
}
}

public final override fun trySend(element: E): ChannelResult<Unit> {
val result = offerInternal(element)
return when {
result === OFFER_SUCCESS -> true
result === OFFER_SUCCESS -> ChannelResult.success(Unit)
result === OFFER_FAILED -> {
// We should check for closed token on offer as well, otherwise offer won't be linearizable
// We should check for closed token on trySend as well, otherwise trySend won't be linearizable
// in the face of concurrent close()
// See https://github.com/Kotlin/kotlinx.coroutines/issues/359
throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false))
val closedForSend = closedForSend ?: return ChannelResult.failure()
ChannelResult.closed(helpCloseAndGetSendException(closedForSend))
}
result is Closed<*> -> {
throw recoverStackTrace(helpCloseAndGetSendException(element, result))
ChannelResult.closed(helpCloseAndGetSendException(result))
}
else -> error("offerInternal returned $result")
else -> error("trySend returned $result")
}
}

private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
helpClose(closed)
return closed.sendException
}

private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
// To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
Expand Down Expand Up @@ -632,9 +651,11 @@ internal abstract class AbstractChannel<E>(
}

@Suppress("UNCHECKED_CAST")
public final override fun poll(): E? {
public final override fun tryReceive(): ChannelResult<E> {
val result = pollInternal()
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
if (result === POLL_FAILED) return ChannelResult.failure()
if (result is Closed<*>) return ChannelResult.closed(result.closeCause)
return ChannelResult.success(result as E)
}

@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
Expand Down Expand Up @@ -905,7 +926,7 @@ internal abstract class AbstractChannel<E>(
@JvmField val receiveMode: Int
) : Receive<E>() {
fun resumeValue(value: E): Any? = when (receiveMode) {
RECEIVE_RESULT -> ChannelResult.value(value)
RECEIVE_RESULT -> ChannelResult.success(value)
else -> value
}

Expand Down Expand Up @@ -990,7 +1011,7 @@ internal abstract class AbstractChannel<E>(
@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(value: E) {
block.startCoroutineCancellable(
if (receiveMode == RECEIVE_RESULT) ChannelResult.value(value) else value,
if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
select.completion,
resumeOnCancellationFun(value)
)
Expand Down Expand Up @@ -1144,7 +1165,7 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose

@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
private inline fun <E> Any?.toResult(): ChannelResult<E> =
if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.value(this as E)
if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E)

@Suppress("NOTHING_TO_INLINE")
private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)
82 changes: 58 additions & 24 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,22 @@ public interface SendChannel<in E> {
* then it calls `onUndeliveredElement` before throwing an exception.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*/
public fun offer(element: E): Boolean
public fun offer(element: E): Boolean {
val result = trySend(element)
if (result.isSuccess) return true
throw recoverStackTrace(result.exceptionOrNull() ?: return false)
}

/**
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
* and returns the successful result. Otherwise, returns failed or closed result.
* This is synchronous variant of [send], which backs off in situations when `send` suspends or throws.
*
* When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and
* it does not call `onUndeliveredElement` that was installed for this channel.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*/
public fun trySend(element: E): ChannelResult<Unit>

/**
* Closes this channel.
Expand Down Expand Up @@ -218,7 +233,7 @@ public interface ReceiveChannel<out E> {
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@LowPriorityInOverloadResolution
@Deprecated(
message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension",
message = "Deprecated in favor of receiveCatching and receiveOrNull extension",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull")
)
Expand All @@ -230,13 +245,13 @@ public interface ReceiveChannel<out E> {
* [closed for `receive`][isClosedForReceive] without a cause. The [select] invocation fails with
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension.
* @suppress **Deprecated**: in favor of receiveCatching and onReceiveOrNull extension.
*/
@ObsoleteCoroutinesApi
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@LowPriorityInOverloadResolution
@Deprecated(
message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension",
message = "Deprecated in favor of receiveCatching and onReceiveOrNull extension",
level = DeprecationLevel.WARNING,

This comment has been minimized.

Copy link
@stefandekanskilotus

stefandekanskilotus May 12, 2021

Is it really ok for this to 'only' be a 'WARNING' level? Maybe some people (mhm me) were relying on null as 'the channel is empty' state, so upgrading the library breaks my code since this throws if the channel is empty?

This comment has been minimized.

Copy link
@stefandekanskilotus

stefandekanskilotus May 12, 2021

I mean I can't just drop in upgrade the lib, so change to Error perhaps, or make it compatible with previous behavior? (return null when empty)?

This comment has been minimized.

Copy link
@qwwdfsad

qwwdfsad May 12, 2021

Author Collaborator

Did I get you right that onReceiveOrNull changed its behaviour between 1.4.3 and 1.5.0-RC?

This comment has been minimized.

Copy link
@stefandekanskilotus

stefandekanskilotus May 12, 2021

Ah sorry misplaced the comment, I ment to do it over poll

This comment has been minimized.

Copy link
@stefandekanskilotus

stefandekanskilotus May 12, 2021

And 'yes' for poll.

This comment has been minimized.

Copy link
@stefandekanskilotus

stefandekanskilotus May 12, 2021

The replacement code suggested is ok (tryReceive().getOrNull()) but poll behaviour is not equal to tryReceive().getOrNull().

replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull")
)
Expand All @@ -251,7 +266,7 @@ public interface ReceiveChannel<out E> {
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. The `receiveOrClosed` call can retrieve the element from the channel,
* suspended, it will not resume successfully. The `receiveCatching` call can retrieve the element from the channel,
* but then throw [CancellationException], thus failing to deliver the element.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*
Expand All @@ -271,11 +286,22 @@ public interface ReceiveChannel<out E> {
public val onReceiveCatching: SelectClause1<ChannelResult<E>>

/**
* Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty
* Retrieves and removes an element from this channel if it's not empty or returns `null` if the channel is empty
* or is [is closed for `receive`][isClosedForReceive] without a cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun poll(): E?
public fun poll(): E? {
val result = tryReceive()
if (result.isSuccess) return result.getOrThrow()
throw recoverStackTrace(result.exceptionOrNull() ?: return null)
}

/**
* Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
* result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
* result if the channel is closed.
*/
public fun tryReceive(): ChannelResult<E>

/**
* Returns a new iterator to receive elements from this channel using a `for` loop.
Expand Down Expand Up @@ -315,35 +341,35 @@ public interface ReceiveChannel<out E> {

/**
* A discriminated union of channel operation result.
* It encapsulates successful or failed result of a channel operation, or a failed operation to a closed channel with
* It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with
* an optional cause.
*
* Successful result represents a successful operation with value of type [T], for example, result of [Channel.receiveCatching]
* operation or a successfully sent element as a result of [Channel.trySend].
* The successful result represents a successful operation with a value of type [T], for example,
* the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend].
*
* Failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
* The failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
* E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
*
* Closed result represents an operation attempt to a closed channel and also implies that the operation was failed.
* The closed result represents an operation attempt to a closed channel and also implies that the operation has failed.
*/
@Suppress("UNCHECKED_CAST")
public inline class ChannelResult<out T>
internal constructor(private val holder: Any?) {
@PublishedApi internal constructor(private val holder: Any?) {
/**
* Returns `true` if this instance represents a successful
* operation outcome.
*
* In this case [isFailure] and [isClosed] return false.
* In this case [isFailure] and [isClosed] return `false`.
*/
public val isSuccess: Boolean get() = holder !is Closed
public val isSuccess: Boolean get() = holder !is Failed

/**
* Returns true if this instance represents unsuccessful operation.
* Returns `true` if this instance represents unsuccessful operation.
*
* In this case [isSuccess] returns false, but it does not imply
* that the channel is failed or closed.
*
* Example of failed operation without an exception and channel being closed
* Example of a failed operation without an exception and channel being closed
* is [Channel.trySend] attempt to a channel that is full.
*/
public val isFailure: Boolean get() = holder is Failed
Expand All @@ -352,7 +378,7 @@ internal constructor(private val holder: Any?) {
* Returns `true` if this instance represents unsuccessful operation
* to a closed or cancelled channel.
*
* In this case [isSuccess] returns false, [isFailure] returns `true`, but it does not imply
* In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply
* that [exceptionOrNull] returns non-null value.
*
* It can happen if the channel was [closed][Channel.close] normally without an exception.
Expand All @@ -374,7 +400,7 @@ internal constructor(private val holder: Any?) {
}

/**
* Returns the encapsulated exception if this instance represents failure or null if it is success
* Returns the encapsulated exception if this instance represents failure or `null` if it is success
* or unsuccessful operation to closed channel.
*/
public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
Expand All @@ -389,13 +415,21 @@ internal constructor(private val holder: Any?) {
override fun toString(): String = "Closed($cause)"
}

internal companion object {
@Suppress("NOTHING_TO_INLINE")
internal inline fun <E> value(value: E): ChannelResult<E> =
@Suppress("NOTHING_TO_INLINE")
@InternalCoroutinesApi
public companion object {
private val failed = Failed()

@InternalCoroutinesApi
public fun <E> success(value: E): ChannelResult<E> =
ChannelResult(value)

@Suppress("NOTHING_TO_INLINE")
internal inline fun <E> closed(cause: Throwable?): ChannelResult<E> =
@InternalCoroutinesApi
public fun <E> failure(): ChannelResult<E> =
ChannelResult(failed)

@InternalCoroutinesApi
public fun <E> closed(cause: Throwable?): ChannelResult<E> =
ChannelResult(Closed(cause))
}

Expand Down
Loading

0 comments on commit a67fd8e

Please sign in to comment.