File tree Expand file tree Collapse file tree 4 files changed +64
-4
lines changed Expand file tree Collapse file tree 4 files changed +64
-4
lines changed Original file line number Diff line number Diff line change @@ -331,12 +331,19 @@ internal suspend inline fun <T> suspendCancellableCoroutineReusable(
331331 crossinline block : (CancellableContinuation <T >) -> Unit
332332): T = suspendCoroutineUninterceptedOrReturn { uCont ->
333333 val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
334- block(cancellable)
334+ try {
335+ block(cancellable)
336+ } catch (e: Throwable ) {
337+ // Here we catch any unexpected exception from user-supplied block (e.g. invariant violation)
338+ // and release claimed continuation in order to leave it in a reasonable state (see #3613)
339+ cancellable.releaseClaimedReusableContinuation()
340+ throw e
341+ }
335342 cancellable.getResult()
336343}
337344
338345internal fun <T > getOrCreateCancellableContinuation (delegate : Continuation <T >): CancellableContinuationImpl <T > {
339- // If used outside of our dispatcher
346+ // If used outside our dispatcher
340347 if (delegate !is DispatchedContinuation <T >) {
341348 return CancellableContinuationImpl (delegate, MODE_CANCELLABLE )
342349 }
Original file line number Diff line number Diff line change @@ -337,7 +337,7 @@ internal open class CancellableContinuationImpl<in T>(
337337 * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
338338 * in which case it detaches from the parent and cancels this continuation.
339339 */
340- private fun releaseClaimedReusableContinuation () {
340+ fun releaseClaimedReusableContinuation () {
341341 // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
342342 val cancellationCause = (delegate as ? DispatchedContinuation <* >)?.tryReleaseClaimedContinuation(this ) ? : return
343343 detachChild()
Original file line number Diff line number Diff line change @@ -248,7 +248,7 @@ internal open class BufferedChannel<E>(
248248 * Note that this implementation always invokes [suspendCancellableCoroutineReusable],
249249 * as we do not care about broadcasts performance -- they are already deprecated.
250250 */
251- internal open suspend fun sendBroadcast (element : E ): Boolean = suspendCancellableCoroutineReusable { cont ->
251+ internal open suspend fun sendBroadcast (element : E ): Boolean = suspendCancellableCoroutine { cont ->
252252 check(onUndeliveredElement == null ) {
253253 " the `onUndeliveredElement` feature is unsupported for `sendBroadcast(e)`"
254254 }
Original file line number Diff line number Diff line change 1+ /*
2+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+ */
4+
5+ package kotlinx.coroutines
6+
7+ import org.junit.Test
8+ import java.util.concurrent.CountDownLatch
9+ import java.util.concurrent.atomic.AtomicReference
10+ import kotlin.coroutines.*
11+
12+ // Stresses scenario from #3613
13+ class ReusableCancellableContinuationInvariantStressTest : TestBase () {
14+
15+ // Tests have a timeout 10 sec because the bug they catch leads to an infinite spin-loop
16+
17+ @Test(timeout = 10_000 )
18+ fun testExceptionFromSuspendReusable () = doTest { /* nothing */ }
19+
20+
21+ @Test(timeout = 10_000 )
22+ fun testExceptionFromCancelledSuspendReusable () = doTest { it.cancel() }
23+
24+
25+ @Suppress(" SuspendFunctionOnCoroutineScope" )
26+ private inline fun doTest (crossinline block : (Job ) -> Unit ) {
27+ runTest {
28+ repeat(10_000 ) {
29+ val latch = CountDownLatch (1 )
30+ val continuationToResume = AtomicReference <Continuation <Unit >? > (null )
31+ val j1 = launch(Dispatchers .Default ) {
32+ latch.await()
33+ suspendCancellableCoroutineReusable {
34+ continuationToResume.set(it)
35+ block(coroutineContext.job)
36+ throw CancellationException () // Don't let getResult() chance to execute
37+ }
38+ }
39+
40+ val j2 = launch(Dispatchers .Default ) {
41+ latch.await()
42+ while (continuationToResume.get() == null ) {
43+ // spin
44+ }
45+ continuationToResume.get()!! .resume(Unit )
46+ }
47+
48+ latch.countDown()
49+ joinAll(j1, j2)
50+ }
51+ }
52+ }
53+ }
You can’t perform that action at this time.
0 commit comments