Skip to content

Commit

Permalink
Make BufferedChannelIterator#hasNext idempotent (#4065)
Browse files Browse the repository at this point in the history
The rationale in favor of the change includes:
* in the `1.6.x` series `Channel().iterator().hasNext()` was idempotent
* despite the lack of requirement for the `Iterator#hasNext` method implementations
 to be idempotent, it is generally safer to make them such. The same applies
 to `ChannelIterator#hasNext`
  • Loading branch information
gitpaxultek authored and AleksDanil committed May 10, 2024
1 parent 3b9ab7f commit 25d4311
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
11 changes: 7 additions & 4 deletions kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1580,8 +1580,8 @@ internal open class BufferedChannel<E>(
* [CancellableContinuation] and [SelectInstance].
*
* Roughly, [hasNext] is a [receive] sibling, while [next] simply
* returns the already retrieved element. From the implementation
* side, [receiveResult] stores the element retrieved by [hasNext]
* returns the already retrieved element and [hasNext] being idempotent.
* From the implementation side, [receiveResult] stores the element retrieved by [hasNext]
* (or a special [CHANNEL_CLOSED] token if the channel is closed).
*
* The [invoke] function is a [CancelHandler] implementation,
Expand Down Expand Up @@ -1614,8 +1614,10 @@ internal open class BufferedChannel<E>(
private var continuation: CancellableContinuationImpl<Boolean>? = null

// `hasNext()` is just a special receive operation.
override suspend fun hasNext(): Boolean =
receiveImpl( // <-- this is an inline function
override suspend fun hasNext(): Boolean {
return if (this.receiveResult !== NO_RECEIVE_RESULT && this.receiveResult !== CHANNEL_CLOSED) {
true
} else receiveImpl( // <-- this is an inline function
// Do not create a continuation until it is required;
// it is created later via [onNoWaiterSuspend], if needed.
waiter = null,
Expand All @@ -1636,6 +1638,7 @@ internal open class BufferedChannel<E>(
// The tail-call optimization is applied here.
onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) }
)
}

private fun onClosedHasNext(): Boolean {
this.receiveResult = CHANNEL_CLOSED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,40 @@ import kotlinx.coroutines.*
import kotlin.test.*

class BufferedChannelTest : TestBase() {
@Test
fun testIteratorHasNextIsIdempotent() = runTest {
val q = Channel<Int>()
check(q.isEmpty)
val iter = q.iterator()
expect(1)
val sender = launch {
expect(4)
q.send(1) // sent
expect(10)
q.close()
expect(11)
}
expect(2)
val receiver = launch {
expect(5)
check(iter.hasNext())
expect(6)
check(iter.hasNext())
expect(7)
check(iter.hasNext())
expect(8)
check(iter.next() == 1)
expect(9)
check(!iter.hasNext())
expect(12)
}
expect(3)
sender.join()
receiver.join()
check(q.isClosedForReceive)
finish(13)
}

@Test
fun testSimple() = runTest {
val q = Channel<Int>(1)
Expand Down

0 comments on commit 25d4311

Please sign in to comment.