Skip to content

Commit 1b1acda

Browse files
1093: Launch side effects ATOMIC, await render complete
Solves #1093 Add headlessIntegrationTest to use for the new test case
1 parent b130a43 commit 1b1acda

File tree

10 files changed

+452
-18
lines changed

10 files changed

+452
-18
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ android.useAndroidX=true
88
systemProp.org.gradle.internal.publish.checksums.insecure=true
99

1010
GROUP=com.squareup.workflow1
11-
VERSION_NAME=1.11.0-beta04-SNAPSHOT
11+
VERSION_NAME=1.11.0-beta04-atomic-w-SNAPSHOT
1212

1313
POM_DESCRIPTION=Square Workflow
1414

workflow-core/src/commonMain/kotlin/com/squareup/workflow1/BaseRenderContext.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ import kotlin.reflect.typeOf
3030
* )
3131
* ```
3232
*
33-
* To create populate such functions from your `render` method, you first need to define a
33+
* To create such functions from your `render` method, you first need to define a
3434
* [WorkflowAction] to handle the event by changing state, emitting an output, or both. Then, just
3535
* pass a lambda to your rendering that instantiates the action and passes it to
3636
* [actionSink.send][Sink.send].
3737
*
3838
* ## Performing asynchronous work
3939
*
40-
* See [runningWorker].
40+
* See [runningSideEffect] and [runningWorker].
4141
*
4242
* ## Composing children
4343
*
@@ -92,8 +92,15 @@ public interface BaseRenderContext<out PropsT, StateT, in OutputT> {
9292
* [cancelled](https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html).
9393
*
9494
* The coroutine will run with the same [CoroutineContext][kotlin.coroutines.CoroutineContext]
95-
* that the workflow runtime is running in. The side effect coroutine will not be started until
96-
* _after_ the first render call than runs it returns.
95+
* that the workflow runtime is running in.
96+
* The coroutine is launched with [CoroutineStart.ATOMIC][kotlinx.coroutines.CoroutineStart.ATOMIC]
97+
* start mode, which means that it will _start_ even if the scope is cancelled before it has a
98+
* chance to dispatch. This is to guarantee that any time a [sideEffect] is declared running
99+
* in any render pass, it will at least be started. If the backing scope is cancelled - it is no
100+
* longer declared as running in a consecutive render pass, or the rendering [Workflow] is no
101+
* longer rendered - then it will be cancelled at the first suspension point within [sideEffect].
102+
*
103+
*
97104
*
98105
* @param key The string key that is used to distinguish between side effects.
99106
* @param sideEffect The suspend function that will be launched in a coroutine to perform the

workflow-core/src/commonMain/kotlin/com/squareup/workflow1/Worker.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,6 @@ public interface Worker<out OutputT> {
111111
* When the worker is torn down, the coroutine is cancelled.
112112
* This coroutine is launched in the same scope as the workflow runtime, with a few changes:
113113
*
114-
* - The dispatcher is always set to [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] to
115-
* minimize overhead for workers that don't care which thread they're executed on (e.g. logging
116-
* side effects, workers that wrap third-party reactive libraries, etc.). If your work cares
117-
* which thread it runs on, use [withContext][kotlinx.coroutines.withContext] or
118-
* [flowOn][kotlinx.coroutines.flow.flowOn] to specify a dispatcher.
119114
* - A [CoroutineName][kotlinx.coroutines.CoroutineName] that describes the `Worker` instance
120115
* (via `toString`) and the key specified by the workflow running the worker.
121116
*

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import com.squareup.workflow1.internal.WorkflowRunner
66
import com.squareup.workflow1.internal.chained
77
import kotlinx.coroutines.CancellationException
88
import kotlinx.coroutines.CoroutineScope
9+
import kotlinx.coroutines.Dispatchers
910
import kotlinx.coroutines.Job
1011
import kotlinx.coroutines.flow.Flow
1112
import kotlinx.coroutines.flow.MutableStateFlow
1213
import kotlinx.coroutines.flow.StateFlow
1314
import kotlinx.coroutines.isActive
1415
import kotlinx.coroutines.launch
16+
import kotlin.coroutines.ContinuationInterceptor
1517

1618
/**
1719
* Launches the [workflow] in a new coroutine in [scope] and returns a [StateFlow] of its
@@ -200,7 +202,7 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
200202
}
201203
}
202204

203-
// Pass on to the UI.
205+
// Pass the rendering on to the UI.
204206
renderingsAndSnapshots.value = nextRenderAndSnapshot
205207
// And emit the Output.
206208
sendOutput(actionResult, onOutput)

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/SideEffectNode.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@ package com.squareup.workflow1.internal
22

33
import com.squareup.workflow1.internal.InlineLinkedList.InlineListNode
44
import kotlinx.coroutines.Job
5+
import kotlinx.coroutines.sync.Mutex
56

67
/**
78
* Holds a [Job] that represents a running [side effect][RealRenderContext.runningSideEffect], as
89
* well as the key used to identify that side effect.
10+
*
11+
* Lastly, holds the [renderComplete] that is unlocked when render() is complete (and so the sink
12+
* can be used).
913
*/
1014
internal class SideEffectNode(
1115
val key: String,
12-
val job: Job
16+
val job: Job,
17+
val renderComplete: Mutex
1318
) : InlineListNode<SideEffectNode> {
1419

1520
override var nextListNode: SideEffectNode? = null

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkflowNode.kt

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import com.squareup.workflow1.internal.RealRenderContext.SideEffectRunner
1919
import kotlinx.coroutines.CancellationException
2020
import kotlinx.coroutines.CoroutineName
2121
import kotlinx.coroutines.CoroutineScope
22-
import kotlinx.coroutines.CoroutineStart.LAZY
22+
import kotlinx.coroutines.CoroutineStart.ATOMIC
2323
import kotlinx.coroutines.DelicateCoroutinesApi
2424
import kotlinx.coroutines.ExperimentalCoroutinesApi
2525
import kotlinx.coroutines.Job
@@ -29,6 +29,8 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
2929
import kotlinx.coroutines.launch
3030
import kotlinx.coroutines.plus
3131
import kotlinx.coroutines.selects.SelectBuilder
32+
import kotlinx.coroutines.sync.Mutex
33+
import kotlinx.coroutines.sync.withLock
3234
import kotlin.coroutines.CoroutineContext
3335

3436
/**
@@ -40,7 +42,10 @@ import kotlin.coroutines.CoroutineContext
4042
* worker coroutines. This context will override anything from the workflow's scope and any other
4143
* hard-coded values added to worker contexts. It must not contain a [Job] element (it would violate
4244
* structured concurrency).
45+
*
46+
* The opt-in for [ExperimentalCoroutinesApi] is for using a [ATOMIC] on side effect Jobs.
4347
*/
48+
@OptIn(ExperimentalCoroutinesApi::class)
4449
internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
4550
val id: WorkflowNodeId,
4651
workflow: StatefulWorkflow<PropsT, StateT, OutputT, RenderingT>,
@@ -212,9 +217,9 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
212217

213218
// Tear down workflows and workers that are obsolete.
214219
subtreeManager.commitRenderedChildren()
215-
// Side effect jobs are launched lazily, since they can send actions to the sink, and can only
216-
// be started after context is frozen.
217-
sideEffects.forEachStaging { it.job.start() }
220+
// Let all staging side effects know that render is complete.
221+
sideEffects.forEachStaging { if (it.renderComplete.isLocked) it.renderComplete.unlock() }
222+
// Tear down side effects that are no longer declared running.
218223
sideEffects.commitStaging { it.job.cancel() }
219224

220225
return rendering
@@ -260,7 +265,18 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
260265
sideEffect: suspend CoroutineScope.() -> Unit
261266
): SideEffectNode {
262267
val scope = this + CoroutineName("sideEffect[$key] for $id")
263-
val job = scope.launch(start = LAZY, block = sideEffect)
264-
return SideEffectNode(key, job)
268+
val renderComplete = Mutex(locked = true)
269+
// Side effect jobs are ATOMIC because even if the side effect is run and then NOT run
270+
// in consecutive render passes before the side effect can be dispatched, we still want it to
271+
// start. Note that this means that side effects must be co-operative or they could
272+
// unnecessarily hog runtime dispatch. We could force them to be so by adding an
273+
// `if (!isActive) yield()`
274+
// at the start of the sideEffect block, but that also might mean that expected side effects
275+
// don't occur when the sideEffect is run at least once.
276+
val job = scope.launch(start = ATOMIC, block = {
277+
renderComplete.lock()
278+
sideEffect()
279+
})
280+
return SideEffectNode(key, job, renderComplete)
265281
}
266282
}

workflow-testing/api/workflow-testing.api

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
public final class com/squareup/workflow1/testing/HeadlessIntegrationTestKt {
2+
public static final fun headlessIntegrationTest (Lcom/squareup/workflow1/Workflow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;)V
3+
public static final fun headlessIntegrationTest (Lcom/squareup/workflow1/Workflow;Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;)V
4+
public static synthetic fun headlessIntegrationTest$default (Lcom/squareup/workflow1/Workflow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
5+
public static synthetic fun headlessIntegrationTest$default (Lcom/squareup/workflow1/Workflow;Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
6+
}
7+
18
public final class com/squareup/workflow1/testing/RenderIdempotencyChecker : com/squareup/workflow1/WorkflowInterceptor {
29
public static final field INSTANCE Lcom/squareup/workflow1/testing/RenderIdempotencyChecker;
310
public fun onInitialState (Ljava/lang/Object;Lcom/squareup/workflow1/Snapshot;Lkotlin/jvm/functions/Function2;Lcom/squareup/workflow1/WorkflowInterceptor$WorkflowSession;)Ljava/lang/Object;
@@ -155,3 +162,18 @@ public final class com/squareup/workflow1/testing/WorkflowTestRuntimeKt {
155162
public static synthetic fun launchForTestingWith$default (Lcom/squareup/workflow1/StatefulWorkflow;Ljava/lang/Object;Lcom/squareup/workflow1/testing/WorkflowTestParams;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Ljava/lang/Object;
156163
}
157164

165+
public final class com/squareup/workflow1/testing/WorkflowTurbine {
166+
public static final field Companion Lcom/squareup/workflow1/testing/WorkflowTurbine$Companion;
167+
public static final field WORKFLOW_TEST_DEFAULT_TIMEOUT_MS J
168+
public fun <init> (Ljava/lang/Object;Lapp/cash/turbine/ReceiveTurbine;)V
169+
public final fun awaitNext (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
170+
public static synthetic fun awaitNext$default (Lcom/squareup/workflow1/testing/WorkflowTurbine;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
171+
public final fun awaitNextRendering (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
172+
public final fun awaitNextRenderingSatisfying (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
173+
public final fun getFirstRendering ()Ljava/lang/Object;
174+
public final fun skipRenderings (ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
175+
}
176+
177+
public final class com/squareup/workflow1/testing/WorkflowTurbine$Companion {
178+
}
179+
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package com.squareup.workflow1.testing
2+
3+
import app.cash.turbine.ReceiveTurbine
4+
import app.cash.turbine.test
5+
import com.squareup.workflow1.RuntimeConfig
6+
import com.squareup.workflow1.RuntimeConfigOptions
7+
import com.squareup.workflow1.Workflow
8+
import com.squareup.workflow1.WorkflowInterceptor
9+
import com.squareup.workflow1.renderWorkflowIn
10+
import com.squareup.workflow1.testing.WorkflowTurbine.Companion.WORKFLOW_TEST_DEFAULT_TIMEOUT_MS
11+
import kotlinx.coroutines.CoroutineScope
12+
import kotlinx.coroutines.Dispatchers
13+
import kotlinx.coroutines.ExperimentalCoroutinesApi
14+
import kotlinx.coroutines.cancel
15+
import kotlinx.coroutines.flow.MutableStateFlow
16+
import kotlinx.coroutines.flow.StateFlow
17+
import kotlinx.coroutines.flow.asStateFlow
18+
import kotlinx.coroutines.flow.drop
19+
import kotlinx.coroutines.flow.map
20+
import kotlinx.coroutines.test.UnconfinedTestDispatcher
21+
import kotlinx.coroutines.test.runTest
22+
import kotlin.coroutines.CoroutineContext
23+
import kotlin.time.Duration.Companion.milliseconds
24+
25+
/**
26+
* This is a test harness to run integration tests for a Workflow tree. The parameters passed here are
27+
* the same as those to start a Workflow runtime with [renderWorkflowIn] except for ignoring
28+
* state persistence as that is not needed for this style of test.
29+
*
30+
* The [coroutineContext] rather than a [CoroutineScope] is passed so that this harness handles the
31+
* scope for the Workflow runtime for you but you can still specify context for it. It defaults to
32+
* [Dispatchers.Main.immediate]. If you wish to use a dispatcher that skips delays, use a
33+
* [StandardTestDispatcher][kotlinx.coroutines.test.StandardTestDispatcher], so that the dispatcher
34+
* will still guarantee ordering.
35+
*
36+
* A [testTimeout] may be specified to override the default [WORKFLOW_TEST_DEFAULT_TIMEOUT_MS] for
37+
* any particular test. This is the max amount of time the test could spend waiting on a rendering.
38+
*
39+
* This will start the Workflow runtime (with params as passed) rooted at whatever Workflow
40+
* it is called on and then create a [WorkflowTurbine] for its renderings and run [testCase] on that.
41+
* [testCase] can thus drive the test scenario and assert against renderings.
42+
*/
43+
public fun <PropsT, OutputT, RenderingT> Workflow<PropsT, OutputT, RenderingT>.headlessIntegrationTest(
44+
props: StateFlow<PropsT>,
45+
coroutineContext: CoroutineContext = Dispatchers.Main.immediate,
46+
interceptors: List<WorkflowInterceptor> = emptyList(),
47+
runtimeConfig: RuntimeConfig = RuntimeConfigOptions.DEFAULT_CONFIG,
48+
onOutput: suspend (OutputT) -> Unit = {},
49+
testTimeout: Long = WORKFLOW_TEST_DEFAULT_TIMEOUT_MS,
50+
testCase: suspend WorkflowTurbine<RenderingT>.() -> Unit
51+
) {
52+
val workflow = this
53+
54+
runTest(
55+
context = coroutineContext,
56+
timeout = testTimeout.milliseconds
57+
) {
58+
// We use a sub-scope so that we can cancel the Workflow runtime when we are done with it so that
59+
// tests don't all have to do that themselves.
60+
val workflowRuntimeScope = CoroutineScope(coroutineContext)
61+
val renderings = renderWorkflowIn(
62+
workflow = workflow,
63+
props = props,
64+
scope = workflowRuntimeScope,
65+
interceptors = interceptors,
66+
runtimeConfig = runtimeConfig,
67+
onOutput = onOutput
68+
)
69+
70+
val firstRendering = renderings.value.rendering
71+
72+
// Drop one as its provided separately via `firstRendering`.
73+
renderings.drop(1).map {
74+
it.rendering
75+
}.test {
76+
val workflowTurbine = WorkflowTurbine(
77+
firstRendering,
78+
this
79+
)
80+
workflowTurbine.testCase()
81+
cancelAndIgnoreRemainingEvents()
82+
}
83+
workflowRuntimeScope.cancel()
84+
}
85+
}
86+
87+
/**
88+
* Version of [headlessIntegrationTest] that does not require props. For Workflows that have [Unit]
89+
* props type.
90+
*/
91+
@OptIn(ExperimentalCoroutinesApi::class)
92+
public fun <OutputT, RenderingT> Workflow<Unit, OutputT, RenderingT>.headlessIntegrationTest(
93+
coroutineContext: CoroutineContext = UnconfinedTestDispatcher(),
94+
interceptors: List<WorkflowInterceptor> = emptyList(),
95+
runtimeConfig: RuntimeConfig = RuntimeConfigOptions.DEFAULT_CONFIG,
96+
onOutput: suspend (OutputT) -> Unit = {},
97+
testTimeout: Long = WORKFLOW_TEST_DEFAULT_TIMEOUT_MS,
98+
testCase: suspend WorkflowTurbine<RenderingT>.() -> Unit
99+
): Unit = headlessIntegrationTest(
100+
props = MutableStateFlow(Unit).asStateFlow(),
101+
coroutineContext = coroutineContext,
102+
interceptors = interceptors,
103+
runtimeConfig = runtimeConfig,
104+
onOutput = onOutput,
105+
testTimeout = testTimeout,
106+
testCase = testCase
107+
)
108+
109+
/**
110+
* Simple wrapper around a [ReceiveTurbine] of [RenderingT] to provide convenience helper methods specific
111+
* to Workflow renderings.
112+
*
113+
* @property firstRendering The first rendering of the Workflow runtime is made synchronously. This is
114+
* provided separately if any assertions or operations are needed from it.
115+
*/
116+
public class WorkflowTurbine<RenderingT>(
117+
public val firstRendering: RenderingT,
118+
private val receiveTurbine: ReceiveTurbine<RenderingT>
119+
) {
120+
private var usedFirst = false
121+
122+
/**
123+
* Suspend waiting for the next rendering to be produced by the Workflow runtime. Note this includes
124+
* the first (synchronously made) rendering.
125+
*
126+
* @return the rendering.
127+
*/
128+
public suspend fun awaitNextRendering(): RenderingT {
129+
if (!usedFirst) {
130+
usedFirst = true
131+
return firstRendering
132+
}
133+
return receiveTurbine.awaitItem()
134+
}
135+
136+
public suspend fun skipRenderings(count: Int) {
137+
val skippedCount = if (!usedFirst) {
138+
usedFirst = true
139+
count - 1
140+
} else {
141+
count
142+
}
143+
144+
if (skippedCount > 0) {
145+
receiveTurbine.skipItems(skippedCount)
146+
}
147+
}
148+
149+
/**
150+
* Suspend waiting for the next rendering to be produced by the Workflow runtime that satisfies the
151+
* [predicate].
152+
*
153+
* @return the rendering.
154+
*/
155+
public suspend fun awaitNextRenderingSatisfying(
156+
predicate: (RenderingT) -> Boolean
157+
): RenderingT {
158+
var rendering = awaitNextRendering()
159+
while (!predicate(rendering)) {
160+
rendering = awaitNextRendering()
161+
}
162+
return rendering
163+
}
164+
165+
/**
166+
* Suspend waiting for the next rendering which satisfies [precondition], can successfully be mapped
167+
* using [map] and satisfies the [satisfying] predicate when called on the [T] rendering after it
168+
* has been mapped.
169+
*
170+
* @return the mapped rendering as [T]
171+
*/
172+
public suspend fun <T> awaitNext(
173+
precondition: (RenderingT) -> Boolean = { true },
174+
map: (RenderingT) -> T,
175+
satisfying: T.() -> Boolean = { true }
176+
): T =
177+
map(
178+
awaitNextRenderingSatisfying {
179+
precondition(it) &&
180+
with(map(it)) {
181+
this.satisfying()
182+
}
183+
}
184+
)
185+
186+
public companion object {
187+
/**
188+
* Default timeout to use while waiting for renderings.
189+
*/
190+
public const val WORKFLOW_TEST_DEFAULT_TIMEOUT_MS: Long = 60_000L
191+
}
192+
}

0 commit comments

Comments
 (0)