File tree Expand file tree Collapse file tree 4 files changed +65
-7
lines changed Expand file tree Collapse file tree 4 files changed +65
-7
lines changed Original file line number Diff line number Diff line change @@ -331,12 +331,19 @@ internal suspend inline fun <T> suspendCancellableCoroutineReusable(
331331 crossinline block : (CancellableContinuationImpl <T >) -> Unit
332332): T = suspendCoroutineUninterceptedOrReturn { uCont ->
333333 val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
334- block(cancellable)
334+ try {
335+ block(cancellable)
336+ } catch (e: Throwable ) {
337+ // Here we catch any unexpected exception from user-supplied block (e.g. invariant violation)
338+ // and release claimed continuation in order to leave it in a reasonable state (see #3613)
339+ cancellable.releaseClaimedReusableContinuation()
340+ throw e
341+ }
335342 cancellable.getResult()
336343}
337344
338345internal fun <T > getOrCreateCancellableContinuation (delegate : Continuation <T >): CancellableContinuationImpl <T > {
339- // If used outside of our dispatcher
346+ // If used outside our dispatcher
340347 if (delegate !is DispatchedContinuation <T >) {
341348 return CancellableContinuationImpl (delegate, MODE_CANCELLABLE )
342349 }
Original file line number Diff line number Diff line change @@ -357,8 +357,8 @@ internal open class CancellableContinuationImpl<in T>(
357357 * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
358358 * in which case it detaches from the parent and cancels this continuation.
359359 */
360- private fun releaseClaimedReusableContinuation () {
361- // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
360+ internal fun releaseClaimedReusableContinuation () {
361+ // Cannot be cast if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
362362 val cancellationCause = (delegate as ? DispatchedContinuation <* >)?.tryReleaseClaimedContinuation(this ) ? : return
363363 detachChild()
364364 cancel(cancellationCause)
Original file line number Diff line number Diff line change @@ -241,10 +241,8 @@ internal open class BufferedChannel<E>(
241241 * In case of coroutine cancellation, the element may be undelivered --
242242 * the [onUndeliveredElement] feature is unsupported in this implementation.
243243 *
244- * Note that this implementation always invokes [suspendCancellableCoroutineReusable],
245- * as we do not care about broadcasts performance -- they are already deprecated.
246244 */
247- internal open suspend fun sendBroadcast (element : E ): Boolean = suspendCancellableCoroutineReusable { cont ->
245+ internal open suspend fun sendBroadcast (element : E ): Boolean = suspendCancellableCoroutine { cont ->
248246 check(onUndeliveredElement == null ) {
249247 " the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
250248 }
Original file line number Diff line number Diff line change 1+ /*
2+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+ */
4+
5+ package kotlinx.coroutines
6+
7+ import org.junit.Test
8+ import java.util.concurrent.CountDownLatch
9+ import java.util.concurrent.atomic.AtomicReference
10+ import kotlin.coroutines.*
11+
12+ // Stresses scenario from #3613
13+ class ReusableCancellableContinuationInvariantStressTest : TestBase () {
14+
15+ // Tests have a timeout 10 sec because the bug they catch leads to an infinite spin-loop
16+
17+ @Test(timeout = 10_000 )
18+ fun testExceptionFromSuspendReusable () = doTest { /* nothing */ }
19+
20+
21+ @Test(timeout = 10_000 )
22+ fun testExceptionFromCancelledSuspendReusable () = doTest { it.cancel() }
23+
24+
25+ @Suppress(" SuspendFunctionOnCoroutineScope" )
26+ private inline fun doTest (crossinline block : (Job ) -> Unit ) {
27+ runTest {
28+ repeat(10_000 ) {
29+ val latch = CountDownLatch (1 )
30+ val continuationToResume = AtomicReference <Continuation <Unit >? > (null )
31+ val j1 = launch(Dispatchers .Default ) {
32+ latch.await()
33+ suspendCancellableCoroutineReusable {
34+ continuationToResume.set(it)
35+ block(coroutineContext.job)
36+ throw CancellationException () // Don't let getResult() chance to execute
37+ }
38+ }
39+
40+ val j2 = launch(Dispatchers .Default ) {
41+ latch.await()
42+ while (continuationToResume.get() == null ) {
43+ // spin
44+ }
45+ continuationToResume.get()!! .resume(Unit )
46+ }
47+
48+ latch.countDown()
49+ joinAll(j1, j2)
50+ }
51+ }
52+ }
53+ }
You can’t perform that action at this time.
0 commit comments