Skip to content

1093: Launch Side Effects atomically so they are always started #1102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ android.useAndroidX=true
systemProp.org.gradle.internal.publish.checksums.insecure=true

GROUP=com.squareup.workflow1
VERSION_NAME=1.11.0-beta04-SNAPSHOT
VERSION_NAME=1.11.0-beta04-atomic-w-SNAPSHOT

POM_DESCRIPTION=Square Workflow

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import kotlin.reflect.typeOf
* )
* ```
*
* To create populate such functions from your `render` method, you first need to define a
* To create such functions from your `render` method, you first need to define a
* [WorkflowAction] to handle the event by changing state, emitting an output, or both. Then, just
* pass a lambda to your rendering that instantiates the action and passes it to
* [actionSink.send][Sink.send].
*
* ## Performing asynchronous work
*
* See [runningWorker].
* See [runningSideEffect] and [runningWorker].
*
* ## Composing children
*
Expand Down Expand Up @@ -92,8 +92,15 @@ public interface BaseRenderContext<out PropsT, StateT, in OutputT> {
* [cancelled](https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html).
*
* The coroutine will run with the same [CoroutineContext][kotlin.coroutines.CoroutineContext]
* that the workflow runtime is running in. The side effect coroutine will not be started until
* _after_ the first render call than runs it returns.
* that the workflow runtime is running in.
* The coroutine is launched with [CoroutineStart.ATOMIC][kotlinx.coroutines.CoroutineStart.ATOMIC]
* start mode, which means that it will _start_ even if the scope is cancelled before it has a
* chance to dispatch. This is to guarantee that any time a [sideEffect] is declared running
* in any render pass, it will at least be started. If the backing scope is cancelled - it is no
* longer declared as running in a consecutive render pass, or the rendering [Workflow] is no
* longer rendered - then it will be cancelled at the first suspension point within [sideEffect].
*
*
*
* @param key The string key that is used to distinguish between side effects.
* @param sideEffect The suspend function that will be launched in a coroutine to perform the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ public interface Worker<out OutputT> {
* When the worker is torn down, the coroutine is cancelled.
* This coroutine is launched in the same scope as the workflow runtime, with a few changes:
*
* - The dispatcher is always set to [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] to
* minimize overhead for workers that don't care which thread they're executed on (e.g. logging
* side effects, workers that wrap third-party reactive libraries, etc.). If your work cares
* which thread it runs on, use [withContext][kotlinx.coroutines.withContext] or
* [flowOn][kotlinx.coroutines.flow.flowOn] to specify a dispatcher.
Comment on lines -114 to -118
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just out-dated. Not true for some time (since GUWT and side effects I think), but we hadn't updated documentation.

* - A [CoroutineName][kotlinx.coroutines.CoroutineName] that describes the `Worker` instance
* (via `toString`) and the key specified by the workflow running the worker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
}
}

// Pass on to the UI.
// Pass the rendering on to the UI.
renderingsAndSnapshots.value = nextRenderAndSnapshot
// And emit the Output.
sendOutput(actionResult, onOutput)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package com.squareup.workflow1.internal

import com.squareup.workflow1.internal.InlineLinkedList.InlineListNode
import kotlinx.coroutines.Job
import kotlinx.coroutines.sync.Mutex

/**
* Holds a [Job] that represents a running [side effect][RealRenderContext.runningSideEffect], as
* well as the key used to identify that side effect.
*
* Lastly, holds the [renderComplete] that is unlocked when render() is complete (and so the sink
* can be used).
*/
internal class SideEffectNode(
val key: String,
val job: Job
val job: Job,
val renderComplete: Mutex
) : InlineListNode<SideEffectNode> {

override var nextListNode: SideEffectNode? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.squareup.workflow1.internal.RealRenderContext.SideEffectRunner
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.LAZY
import kotlinx.coroutines.CoroutineStart.ATOMIC
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
Expand All @@ -29,6 +29,7 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.selects.SelectBuilder
import kotlinx.coroutines.sync.Mutex
import kotlin.coroutines.CoroutineContext

/**
Expand All @@ -40,7 +41,10 @@ import kotlin.coroutines.CoroutineContext
* worker coroutines. This context will override anything from the workflow's scope and any other
* hard-coded values added to worker contexts. It must not contain a [Job] element (it would violate
* structured concurrency).
*
* The opt-in for [ExperimentalCoroutinesApi] is for using a [ATOMIC] on side effect Jobs.
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
val id: WorkflowNodeId,
workflow: StatefulWorkflow<PropsT, StateT, OutputT, RenderingT>,
Expand Down Expand Up @@ -212,9 +216,9 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(

// Tear down workflows and workers that are obsolete.
subtreeManager.commitRenderedChildren()
// Side effect jobs are launched lazily, since they can send actions to the sink, and can only
// be started after context is frozen.
sideEffects.forEachStaging { it.job.start() }
// Let all staging side effects know that render is complete.
sideEffects.forEachStaging { if (it.renderComplete.isLocked) it.renderComplete.unlock() }
// Tear down side effects that are no longer declared running.
sideEffects.commitStaging { it.job.cancel() }

return rendering
Expand Down Expand Up @@ -260,7 +264,18 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
sideEffect: suspend CoroutineScope.() -> Unit
): SideEffectNode {
val scope = this + CoroutineName("sideEffect[$key] for $id")
val job = scope.launch(start = LAZY, block = sideEffect)
return SideEffectNode(key, job)
val renderComplete = Mutex(locked = true)
// Side effect jobs are ATOMIC because even if the side effect is run and then NOT run
// in consecutive render passes before the side effect can be dispatched, we still want it to
// start. Note that this means that side effects must be co-operative or they could
// unnecessarily hog runtime dispatch. We could force them to be so by adding an
// `if (!isActive) yield()`
// at the start of the sideEffect block, but that also might mean that expected side effects
// don't occur when the sideEffect is run at least once.
val job = scope.launch(start = ATOMIC, block = {
renderComplete.lock()
sideEffect()
})
return SideEffectNode(key, job, renderComplete)
}
}
22 changes: 22 additions & 0 deletions workflow-testing/api/workflow-testing.api
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
public final class com/squareup/workflow1/testing/HeadlessIntegrationTestKt {
public static final fun headlessIntegrationTest (Lcom/squareup/workflow1/Workflow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;)V
public static final fun headlessIntegrationTest (Lcom/squareup/workflow1/Workflow;Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;)V
public static synthetic fun headlessIntegrationTest$default (Lcom/squareup/workflow1/Workflow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
public static synthetic fun headlessIntegrationTest$default (Lcom/squareup/workflow1/Workflow;Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
}

public final class com/squareup/workflow1/testing/RenderIdempotencyChecker : com/squareup/workflow1/WorkflowInterceptor {
public static final field INSTANCE Lcom/squareup/workflow1/testing/RenderIdempotencyChecker;
public fun onInitialState (Ljava/lang/Object;Lcom/squareup/workflow1/Snapshot;Lkotlin/jvm/functions/Function2;Lcom/squareup/workflow1/WorkflowInterceptor$WorkflowSession;)Ljava/lang/Object;
Expand Down Expand Up @@ -155,3 +162,18 @@ public final class com/squareup/workflow1/testing/WorkflowTestRuntimeKt {
public static synthetic fun launchForTestingWith$default (Lcom/squareup/workflow1/StatefulWorkflow;Ljava/lang/Object;Lcom/squareup/workflow1/testing/WorkflowTestParams;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class com/squareup/workflow1/testing/WorkflowTurbine {
public static final field Companion Lcom/squareup/workflow1/testing/WorkflowTurbine$Companion;
public static final field WORKFLOW_TEST_DEFAULT_TIMEOUT_MS J
public fun <init> (Ljava/lang/Object;Lapp/cash/turbine/ReceiveTurbine;)V
public final fun awaitNext (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun awaitNext$default (Lcom/squareup/workflow1/testing/WorkflowTurbine;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun awaitNextRendering (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun awaitNextRenderingSatisfying (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getFirstRendering ()Ljava/lang/Object;
public final fun skipRenderings (ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class com/squareup/workflow1/testing/WorkflowTurbine$Companion {
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package com.squareup.workflow1.testing

import app.cash.turbine.ReceiveTurbine
import app.cash.turbine.test
import com.squareup.workflow1.RuntimeConfig
import com.squareup.workflow1.RuntimeConfigOptions
import com.squareup.workflow1.Workflow
import com.squareup.workflow1.WorkflowInterceptor
import com.squareup.workflow1.renderWorkflowIn
import com.squareup.workflow1.testing.WorkflowTurbine.Companion.WORKFLOW_TEST_DEFAULT_TIMEOUT_MS
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration.Companion.milliseconds

/**
* This is a test harness to run integration tests for a Workflow tree. The parameters passed here are
* the same as those to start a Workflow runtime with [renderWorkflowIn] except for ignoring
* state persistence as that is not needed for this style of test.
*
* The [coroutineContext] rather than a [CoroutineScope] is passed so that this harness handles the
* scope for the Workflow runtime for you but you can still specify context for it. It defaults to
* [Dispatchers.Main.immediate]. If you wish to use a dispatcher that skips delays, use a
* [StandardTestDispatcher][kotlinx.coroutines.test.StandardTestDispatcher], so that the dispatcher
* will still guarantee ordering.
*
* A [testTimeout] may be specified to override the default [WORKFLOW_TEST_DEFAULT_TIMEOUT_MS] for
* any particular test. This is the max amount of time the test could spend waiting on a rendering.
*
* This will start the Workflow runtime (with params as passed) rooted at whatever Workflow
* it is called on and then create a [WorkflowTurbine] for its renderings and run [testCase] on that.
* [testCase] can thus drive the test scenario and assert against renderings.
*/
public fun <PropsT, OutputT, RenderingT> Workflow<PropsT, OutputT, RenderingT>.headlessIntegrationTest(
props: StateFlow<PropsT>,
coroutineContext: CoroutineContext = Dispatchers.Main.immediate,
interceptors: List<WorkflowInterceptor> = emptyList(),
runtimeConfig: RuntimeConfig = RuntimeConfigOptions.DEFAULT_CONFIG,
onOutput: suspend (OutputT) -> Unit = {},
testTimeout: Long = WORKFLOW_TEST_DEFAULT_TIMEOUT_MS,
testCase: suspend WorkflowTurbine<RenderingT>.() -> Unit
) {
val workflow = this

runTest(
context = coroutineContext,
timeout = testTimeout.milliseconds
) {
// We use a sub-scope so that we can cancel the Workflow runtime when we are done with it so that
// tests don't all have to do that themselves.
val workflowRuntimeScope = CoroutineScope(coroutineContext)
val renderings = renderWorkflowIn(
workflow = workflow,
props = props,
scope = workflowRuntimeScope,
interceptors = interceptors,
runtimeConfig = runtimeConfig,
onOutput = onOutput
)

val firstRendering = renderings.value.rendering

// Drop one as its provided separately via `firstRendering`.
renderings.drop(1).map {
it.rendering
}.test {
val workflowTurbine = WorkflowTurbine(
firstRendering,
this
)
workflowTurbine.testCase()
cancelAndIgnoreRemainingEvents()
}
workflowRuntimeScope.cancel()
}
}

/**
* Version of [headlessIntegrationTest] that does not require props. For Workflows that have [Unit]
* props type.
*/
@OptIn(ExperimentalCoroutinesApi::class)
public fun <OutputT, RenderingT> Workflow<Unit, OutputT, RenderingT>.headlessIntegrationTest(
coroutineContext: CoroutineContext = UnconfinedTestDispatcher(),
interceptors: List<WorkflowInterceptor> = emptyList(),
runtimeConfig: RuntimeConfig = RuntimeConfigOptions.DEFAULT_CONFIG,
onOutput: suspend (OutputT) -> Unit = {},
testTimeout: Long = WORKFLOW_TEST_DEFAULT_TIMEOUT_MS,
testCase: suspend WorkflowTurbine<RenderingT>.() -> Unit
): Unit = headlessIntegrationTest(
props = MutableStateFlow(Unit).asStateFlow(),
coroutineContext = coroutineContext,
interceptors = interceptors,
runtimeConfig = runtimeConfig,
onOutput = onOutput,
testTimeout = testTimeout,
testCase = testCase
)

/**
* Simple wrapper around a [ReceiveTurbine] of [RenderingT] to provide convenience helper methods specific
* to Workflow renderings.
*
* @property firstRendering The first rendering of the Workflow runtime is made synchronously. This is
* provided separately if any assertions or operations are needed from it.
*/
public class WorkflowTurbine<RenderingT>(
public val firstRendering: RenderingT,
private val receiveTurbine: ReceiveTurbine<RenderingT>
) {
private var usedFirst = false

/**
* Suspend waiting for the next rendering to be produced by the Workflow runtime. Note this includes
* the first (synchronously made) rendering.
*
* @return the rendering.
*/
public suspend fun awaitNextRendering(): RenderingT {
if (!usedFirst) {
usedFirst = true
return firstRendering
}
return receiveTurbine.awaitItem()
}

public suspend fun skipRenderings(count: Int) {
val skippedCount = if (!usedFirst) {
usedFirst = true
count - 1
} else {
count
}

if (skippedCount > 0) {
receiveTurbine.skipItems(skippedCount)
}
}

/**
* Suspend waiting for the next rendering to be produced by the Workflow runtime that satisfies the
* [predicate].
*
* @return the rendering.
*/
public suspend fun awaitNextRenderingSatisfying(
predicate: (RenderingT) -> Boolean
): RenderingT {
var rendering = awaitNextRendering()
while (!predicate(rendering)) {
rendering = awaitNextRendering()
}
return rendering
}

/**
* Suspend waiting for the next rendering which satisfies [precondition], can successfully be mapped
* using [map] and satisfies the [satisfying] predicate when called on the [T] rendering after it
* has been mapped.
*
* @return the mapped rendering as [T]
*/
public suspend fun <T> awaitNext(
precondition: (RenderingT) -> Boolean = { true },
map: (RenderingT) -> T,
satisfying: T.() -> Boolean = { true }
): T =
map(
awaitNextRenderingSatisfying {
precondition(it) &&
with(map(it)) {
this.satisfying()
}
}
)

public companion object {
/**
* Default timeout to use while waiting for renderings.
*/
public const val WORKFLOW_TEST_DEFAULT_TIMEOUT_MS: Long = 60_000L
}
}
Loading