feat(durably): add @effectionx/durably — durable execution for Effection#171
feat(durably): add @effectionx/durably — durable execution for Effection#171
Conversation
…r Effection Extract DurableReducer from the Effection fork (taras/effection, branch durable-internals) into a standalone @effectionx/durably package. This proves durable execution is completely additive to Effection — no fork required, just a peer dependency on effection plus a community package. The durably() function wraps any Effection operation to record every effect resolution to a DurableStream. On resume with existing events, stored results are replayed without re-executing effects, enabling mid-workflow resume after restarts. - 99 tests passing across 6 test suites (0 failures) - Depends on Effection PR 1127 for experimental reducer exports - Supports recording, replay, mid-workflow resume, divergence detection - Handles all Effection primitives: action, sleep, spawn, resource, all, race, each, context, abort signal, withResolvers, signals
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a new durably package implementing durable recording/replay for Effection workflows: JSON-safe types, in-memory and HTTP-backed streams, a DurableReducer with ReplayIndex and toJson/serialization helpers, durably() integration, test helpers, extensive tests, and README documenting semantics and usage. (50 words) Changes
Sequence Diagram(s)sequenceDiagram
participant User as User Code
participant Scope as Effection Scope
participant Reducer as DurableReducer
participant Index as ReplayIndex
participant Stream as DurableStream
User->>Scope: durably(operation, {stream})
Scope->>Reducer: inject reducer via context
Reducer->>Stream: read(initial entries)
Reducer->>Index: build replay index
rect rgba(100,150,200,0.5)
Note over User,Stream: Recording / Replay decision
User->>Scope: yield effect
Scope->>Reducer: reduce(instruction)
alt recorded event exists
Reducer->>Index: consume recorded event
Index-->>Reducer: resolution (no re-execute)
Reducer->>Scope: supply recorded result
else no recorded event
Reducer->>Stream: append effect:yielded
Reducer->>Scope: execute live
Scope->>Reducer: resolved/errored
Reducer->>Stream: append resolved/errored
end
end
rect rgba(150,100,200,0.5)
Note over Scope,Stream: Scope completion
Scope->>Reducer: scope settles
Reducer->>Stream: append workflow:return, scope:destroyed
end
sequenceDiagram
participant App as Application
participant HTTP as useDurableStream
participant Remote as Remote Server
participant HttpStream as HttpDurableStream
participant Producer as IdempotentProducer
App->>HTTP: useDurableStream(url)
HTTP->>Remote: connectOrCreate(url)
Remote-->>HTTP: RemoteStream + initial entries
HTTP->>HttpStream: new HttpDurableStream(remote, initialEntries)
HttpStream->>Producer: create producer
App->>HttpStream: append(event)
HttpStream->>HttpStream: append to local buffer
HttpStream->>Producer: send(serialized event) async
Producer-->>Remote: replicate
HttpStream-->>App: return offset
App->>HTTP: cleanup
HTTP->>Producer: flushAndDetach()
Producer->>Remote: flush pending
HTTP-->>App: cleanup complete
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Suggested reviewers
Important Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (2 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
commit: |
There was a problem hiding this comment.
Actionable comments posted: 17
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@durably/durable-all-race.test.ts`:
- Around line 7-9: The helper function allEvents (used in
durable-all-race.test.ts and durable-scope.test.ts) is duplicated; extract it
into a shared test utilities module (e.g., create test-helpers.ts exporting
function allEvents(stream: InMemoryDurableStream): DurableEvent[]) and update
the tests to import { allEvents } from that module instead of declaring it
locally; ensure the signature and types match the existing usage and remove the
local duplicates in all test files that previously defined allEvents.
In `@durably/durable-errors.test.ts`:
- Around line 805-847: Replace the two non-deterministic yield* sleep(10) calls
by adding explicit, deterministic handshake signals around the suspend point:
emit/wait for a "reached-suspend-point" effect or action just before (in the
first task) and after (in the resumedTask) the suspend() call so the test can
wait on that handshake instead of timing; then call task.halt() /
resumedTask.halt() only after receiving the corresponding handshake. Locate the
generator bodies passed to durably (the functions using useAbortSignal, action,
suspend) and the usages of task.halt and resumedTask.halt and replace the
sleep-based synchronization with a yielded action/effect that resolves when the
suspend point is reached, using the same InMemoryDurableStream/recordStream to
observe and gate the test flow.
In `@durably/durable-reducer.ts`:
- Around line 463-466: The code assumes thrown values are Error instances when
building outcome (ok: false) and calling serializeError; normalize unknown
throwables first by converting non-Error values into an Error (e.g., wrap
strings/objects into a new Error with useful detail) before calling
serializeError or forwarding Err so serialization and replay diagnostics remain
consistent; update both occurrences that use outcome = { ok: false, error:
serializeError(error as Error) } (and the similar usage around the 631-632 area)
to normalize the thrown value into a real Error object first and then pass that
normalized Error into serializeError/Err.
- Line 662: Currently the code silently substitutes "unknown" when
this.scopeIds.get(routine.scope) is undefined, which merges unrelated routines;
change this to fail fast by checking the result of
this.scopeIds.get(routine.scope) (the scopeId) and throwing a clear error
(including routine.scope and any identifying routine info available, e.g.,
routine.name or routine.id) if it's missing instead of assigning "unknown", so
the persistence path never runs with a fallback scope ID.
- Around line 23-27: Replace the module-scoped globalEffectCounter/nextEffectId
pair (globalEffectCounter, nextEffectId) with an instance-local counter that is
seeded from the stream (e.g., stream id or stream offset/epoch) when the reducer
instance is created; ensure nextEffectId becomes a method/closure that uses that
per-instance counter (and optional instance-specific prefix) so IDs are unique
across process restarts and cannot collide with previously issued effect-* IDs
for the same stream.
In `@durably/durable-scope.test.ts`:
- Around line 18-36: The helper function userFacingEvents is defined but never
used; remove the dead function to keep the test file focused (delete the
function declaration named userFacingEvents and any related unused imports), or
if it was intended to be used, refactor tests to call userFacingEvents on
InMemoryDurableStream instances instead of duplicating its logic; locate the
function by its name userFacingEvents and either remove it or wire it into the
relevant tests (e.g., replace inline read/map/filter logic with calls to
userFacingEvents).
In `@durably/durably.ts`:
- Around line 70-110: The rejection handler attached to task.then assumes the
thrown value is an Error by using a cast (let err = error as Error); instead,
narrow the unknown by checking error instanceof Error and if not, wrap it (e.g.
new Error(String(error))) before accessing name/message/stack so SerializedError
fields are valid; update the handler in the task.then rejection branch (where
reducer.isReplayingRoot() is checked and stream.append is called with
result.error) to use this instanceof-based narrowing and then populate name,
message, and stack from the narrowed/wrapped Error.
In `@durably/mod.ts`:
- Line 3: The public entrypoint mod.ts currently re-exports DurableReducer and
toJson which unnecessarily expands the stable public API; remove the named
exports "DurableReducer" and "toJson" from mod.ts so those internals remain
private (keep their definitions in durable-reducer.ts but stop re-exporting them
from mod.ts), update any internal imports to reference "./durable-reducer.ts"
directly if needed, and run/adjust any dependent tests or consumers to import
from the internal module rather than the public mod.ts.
In `@durably/package.json`:
- Around line 30-34: The devDependency "effection" in durably/package.json
currently points to a PR preview URL
(https://pkg.pr.new/thefrontside/effection@1127); add a TODO to track replacing
this temporary spec with the official published version once Effection PR `#1127`
is merged: create a short issue or add an entry to the repo TODO (referencing
durably/package.json and PR `#1127`) and update the devDependencies "effection"
value to the proper semver version (e.g., "effection": "<published-version>")
when the PR is merged and released.
- Around line 8-13: The exports map currently only has "development" and
"default" conditions; add "types" and "import" conditions for both the
development and default branches so TypeScript and ESM consumers can resolve
correctly: update the exports object (the "." entry that currently references
"./mod.ts" for "development" and "./dist/mod.js" for "default") to include
"types" pointing to the corresponding declaration files (e.g. "./mod.d.ts" for
development and "./dist/mod.d.ts" for default) and "import" pointing to the ESM
entry (e.g. "./mod.ts" for development and an ESM build like "./dist/mod.mjs" or
the appropriate dist ESM file) alongside the existing keys.
- Around line 1-35: Add a "files" field to package.json to restrict published
files (include "dist", "mod.ts", "dist/mod.d.ts", "README.md", "LICENSE") and
extend the existing "exports" object for the "." entry to include explicit
"types" and "import" conditions alongside the existing "development" and
"default" keys (e.g., add "types": "./dist/mod.d.ts" and "import":
"./dist/mod.js") so TypeScript and ESM consumers resolve correctly; update the
"exports" entry for "." (and any other export entries if present) and ensure the
"files" array covers build outputs and public package files only.
In `@durably/PLAN.md`:
- Line 170: The fenced directory-tree code block labeled as plain text is
missing a language identifier; update the opening fence from ``` to ```text so
the directory tree in PLAN.md is treated as plain text (i.e., change the code
fence for the directory tree block to ```text).
- Around line 473-478: The PLAN.md includes developer-specific local filesystem
references in the "Design Specification" and "Consumer Project README" bullet
items (the lines that start with those labels); remove those
~/Repositories/cowboyd/... paths and replace them with either
repository-relative links, published URLs, or a short note indicating the
documents are available in the repo (e.g., "see docs/design-spec.md" or "see
README.md in the coroutine-transport-protocol repo"); ensure the two bullet
entries no longer contain absolute home-directory paths and instead point to
accessible locations or are removed if redundant.
In `@durably/README.md`:
- Around line 148-152: Clarify the hard dependency on Effection PR 1127 by
updating the Requirements text to explicitly state whether consumers must wait
for an official Effection release that includes the `effection/experimental`
reducer exports or can use a preview build; mention the package/version spec
(`Effection ^4`) and PR number (PR 1127) and add a concise installation
suggestion such as using the published version once available or a temporary
preview URL/ tarball or Git ref (e.g., install from the effection repo commit or
branch) so readers know how to obtain a compatible effection build.
- Around line 136-141: The DurableStream interface currently defines synchronous
methods which break for persistent backends; update the DurableStream interface
so append(event: DurableEvent) returns Promise<void> (or Operation<void> if you
use that type) and read() returns Promise<StreamEntry[]> (or
Operation<StreamEntry[]>) so implementations can use async I/O, and update any
referencing code (e.g., implementations of DurableStream and callers of
append/read) to await the promises; alternatively, if you intentionally want
sync only for this release, add a short note in the README next to the
DurableStream definition explaining the synchronous design choice and that
future releases will provide async variants for durable storage.
In `@durably/stream.ts`:
- Around line 35-37: The read method currently calls
this.entries.slice(fromOffset) without validating fromOffset; add input
validation in the read(fromOffset = 0) method to ensure fromOffset is a
non-negative integer (e.g., typeof === 'number', Number.isInteger, and >= 0) and
throw a clear RangeError/TypeError if invalid, then proceed to return
this.entries.slice(fromOffset) as before; update the validation logic inside the
read function (referencing read and this.entries.slice) so negative or
non-integer offsets cannot be passed to slice.
In `@durably/types.ts`:
- Around line 11-15: The SerializedError type currently lacks a cause field
which flattens chained errors; update the SerializedError interface to include
an optional cause property (e.g., cause?: SerializedError | unknown) so nested
error context is preserved during serialization and replay; modify any
serialization/deserialization logic that constructs or consumes SerializedError
(refer to the SerializedError type and any functions that serialize errors) to
propagate the original error's cause into this new field.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (17)
durably/PLAN.mddurably/README.mddurably/durable-all-race.test.tsdurably/durable-each.test.tsdurably/durable-errors.test.tsdurably/durable-reducer.test.tsdurably/durable-reducer.tsdurably/durable-resource.test.tsdurably/durable-scope.test.tsdurably/durably.tsdurably/mod.tsdurably/package.jsondurably/stream.tsdurably/tsconfig.jsondurably/types.tspnpm-workspace.yamltsconfig.json
| let task = durably( | ||
| function* () { | ||
| yield* useAbortSignal(); | ||
| yield* action<void>((resolve) => { | ||
| resolve(); | ||
| return () => {}; | ||
| }, "before-suspend"); | ||
| yield* suspend(); | ||
| }, | ||
| { stream: recordStream }, | ||
| ); | ||
|
|
||
| yield* sleep(10); | ||
| yield* task.halt(); | ||
|
|
||
| let events = recordStream.read().map((e) => e.event); | ||
| let suspendIdx = events.findIndex( | ||
| (e) => e.type === "effect:yielded" && e.description === "suspend", | ||
| ); | ||
|
|
||
| let partialEvents = suspendIdx > 0 ? events.slice(0, suspendIdx) : events; | ||
| let partialStream = InMemoryDurableStream.from(partialEvents); | ||
|
|
||
| let signalRef: AbortSignal | null = null; | ||
|
|
||
| let resumedTask = durably( | ||
| function* () { | ||
| let signal = yield* useAbortSignal(); | ||
| signalRef = signal; | ||
| yield* action<void>((resolve) => { | ||
| resolve(); | ||
| return () => {}; | ||
| }, "before-suspend"); | ||
| yield* suspend(); | ||
| }, | ||
| { stream: partialStream }, | ||
| ); | ||
|
|
||
| yield* sleep(10); | ||
|
|
||
| expect(signalRef!.aborted).toEqual(false); | ||
|
|
||
| yield* resumedTask.halt(); |
There was a problem hiding this comment.
Replace sleep(10) synchronization with deterministic start handshakes.
These waits make the test timing-sensitive. Gate halt() on an explicit “reached suspend point” signal instead.
💡 Proposed fix
- let task = durably(
+ let { operation: started, resolve: markStarted } =
+ withResolvers<void>("started");
+ let task = durably(
function* () {
yield* useAbortSignal();
yield* action<void>((resolve) => {
resolve();
return () => {};
}, "before-suspend");
+ markStarted();
yield* suspend();
},
{ stream: recordStream },
);
-
- yield* sleep(10);
+ yield* started;
yield* task.halt();
@@
- let resumedTask = durably(
+ let { operation: resumedStarted, resolve: markResumedStarted } =
+ withResolvers<void>("resumed-started");
+ let resumedTask = durably(
function* () {
let signal = yield* useAbortSignal();
signalRef = signal;
yield* action<void>((resolve) => {
resolve();
return () => {};
}, "before-suspend");
+ markResumedStarted();
yield* suspend();
},
{ stream: partialStream },
);
-
- yield* sleep(10);
+ yield* resumedStarted;As per coding guidelines, .policies/no-sleep-test-sync.md - Tests must not use sleep() for waiting; use deterministic helpers.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let task = durably( | |
| function* () { | |
| yield* useAbortSignal(); | |
| yield* action<void>((resolve) => { | |
| resolve(); | |
| return () => {}; | |
| }, "before-suspend"); | |
| yield* suspend(); | |
| }, | |
| { stream: recordStream }, | |
| ); | |
| yield* sleep(10); | |
| yield* task.halt(); | |
| let events = recordStream.read().map((e) => e.event); | |
| let suspendIdx = events.findIndex( | |
| (e) => e.type === "effect:yielded" && e.description === "suspend", | |
| ); | |
| let partialEvents = suspendIdx > 0 ? events.slice(0, suspendIdx) : events; | |
| let partialStream = InMemoryDurableStream.from(partialEvents); | |
| let signalRef: AbortSignal | null = null; | |
| let resumedTask = durably( | |
| function* () { | |
| let signal = yield* useAbortSignal(); | |
| signalRef = signal; | |
| yield* action<void>((resolve) => { | |
| resolve(); | |
| return () => {}; | |
| }, "before-suspend"); | |
| yield* suspend(); | |
| }, | |
| { stream: partialStream }, | |
| ); | |
| yield* sleep(10); | |
| expect(signalRef!.aborted).toEqual(false); | |
| yield* resumedTask.halt(); | |
| let { operation: started, resolve: markStarted } = | |
| withResolvers<void>("started"); | |
| let task = durably( | |
| function* () { | |
| yield* useAbortSignal(); | |
| yield* action<void>((resolve) => { | |
| resolve(); | |
| return () => {}; | |
| }, "before-suspend"); | |
| markStarted(); | |
| yield* suspend(); | |
| }, | |
| { stream: recordStream }, | |
| ); | |
| yield* started; | |
| yield* task.halt(); | |
| let events = recordStream.read().map((e) => e.event); | |
| let suspendIdx = events.findIndex( | |
| (e) => e.type === "effect:yielded" && e.description === "suspend", | |
| ); | |
| let partialEvents = suspendIdx > 0 ? events.slice(0, suspendIdx) : events; | |
| let partialStream = InMemoryDurableStream.from(partialEvents); | |
| let signalRef: AbortSignal | null = null; | |
| let { operation: resumedStarted, resolve: markResumedStarted } = | |
| withResolvers<void>("resumed-started"); | |
| let resumedTask = durably( | |
| function* () { | |
| let signal = yield* useAbortSignal(); | |
| signalRef = signal; | |
| yield* action<void>((resolve) => { | |
| resolve(); | |
| return () => {}; | |
| }, "before-suspend"); | |
| markResumedStarted(); | |
| yield* suspend(); | |
| }, | |
| { stream: partialStream }, | |
| ); | |
| yield* resumedStarted; | |
| expect(signalRef!.aborted).toEqual(false); | |
| yield* resumedTask.halt(); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/durable-errors.test.ts` around lines 805 - 847, Replace the two
non-deterministic yield* sleep(10) calls by adding explicit, deterministic
handshake signals around the suspend point: emit/wait for a
"reached-suspend-point" effect or action just before (in the first task) and
after (in the resumedTask) the suspend() call so the test can wait on that
handshake instead of timing; then call task.halt() / resumedTask.halt() only
after receiving the corresponding handshake. Locate the generator bodies passed
to durably (the functions using useAbortSignal, action, suspend) and the usages
of task.halt and resumedTask.halt and replace the sleep-based synchronization
with a yielded action/effect that resolves when the suspend point is reached,
using the same InMemoryDurableStream/recordStream to observe and gate the test
flow.
| ```ts | ||
| interface DurableStream { | ||
| append(event: DurableEvent): void; | ||
| read(): StreamEntry[]; | ||
| } | ||
| ``` |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
DurableStream interface shows synchronous append and read, which may limit persistent storage implementations.
The interface as documented is synchronous. Real persistent backends (databases, file systems) typically need async I/O. Consider whether append should return Operation<void> (or at minimum void | Promise<void>) and read should return Operation<StreamEntry[]> to support durable storage beyond in-memory. If this is intentional for the initial release, a brief note in the docs explaining the synchronous design choice would help consumers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/README.md` around lines 136 - 141, The DurableStream interface
currently defines synchronous methods which break for persistent backends;
update the DurableStream interface so append(event: DurableEvent) returns
Promise<void> (or Operation<void> if you use that type) and read() returns
Promise<StreamEntry[]> (or Operation<StreamEntry[]>) so implementations can use
async I/O, and update any referencing code (e.g., implementations of
DurableStream and callers of append/read) to await the promises; alternatively,
if you intentionally want sync only for this release, add a short note in the
README next to the DurableStream definition explaining the synchronous design
choice and that future releases will provide async variants for durable storage.
…nt streams Add @effectionx/durably/http sub-entrypoint that bridges @durable-streams/client to the DurableStream interface. The useDurableStream(url) resource connects to a Durable Streams server, pre-fetches existing events for replay, and provides an HttpDurableStream that buffers locally while replicating to the server. - HttpDurableStream: adapter implementing sync DurableStream interface with async replication via IdempotentProducer - useDurableStream: Effection resource with structured cleanup (flush + detach) - @durable-streams/client is an optional peer dependency - Updated README with realistic examples using main() wrapper and durable-streams for persistence
… code Drop async functions and call() in favor of until() to convert promises to operations. connectOrCreate is now a generator function yielding operations instead of an async function returning promises.
…rors Replace spawn+each+createSignal error forwarding with a single action whose reject callback is bound as the producer error handler. Also fix three biome lint errors in durable-reducer.ts: unnecessary continue, optional chain, and arguments usage.
Add 3 tests confirming two durably() calls can coexist independently: - Sequential: two durably() calls with separate streams, no cross-contamination - Concurrent: two spawned durably() tasks with independent streams - Mixed: replay one stream while another runs live simultaneously Also fix divergence detection README example to use InMemoryDurableStream.from() for the replay stream, matching the actual tested pattern.
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (4)
durably/package.json (1)
1-50:⚠️ Potential issue | 🟡 MinorMissing
filesfield — test files and config will be published to npm.Without a
filesfield, npm will publish everything not in.npmignore/.gitignore, including test files (*.test.ts),tsconfig.json, and other non-distribution files. Other packages in this repo include afilesfield.Proposed fix
"sideEffects": false, + "files": [ + "dist", + "mod.ts", + "http.ts", + "durably.ts", + "durable-reducer.ts", + "http-durable-stream.ts", + "stream.ts", + "types.ts", + "README.md" + ], "devDependencies": {Based on learnings: "package.json must include
filesfield containingdist,mod.ts, and source files."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/package.json` around lines 1 - 50, The package.json is missing a "files" field which causes tests, configs, and other non-distribution files to be published; add a "files" array to package.json that explicitly lists the distribution and published entrypoints (e.g. "dist", "mod.ts", and any source files needed at runtime such as "http.ts" or other top-level .ts files) so only those artifacts are included in the npm package and test/config files (*.test.ts, tsconfig.json, etc.) are excluded.durably/durable-reducer.ts (3)
461-466:⚠️ Potential issue | 🟠 MajorNon-Error throwables still cast unsafely to
Error.
error as Erroron line 464 (and similarly line 629) will produce malformed serialization if a non-Error value is thrown (e.g., a string or plain object). Normalize with a helper as previously suggested.
655-666:⚠️ Potential issue | 🟠 MajorFallback scope ID
"unknown"on line 660 can corrupt the replay stream.If
routine.scopeis not registered, events are persisted under"unknown", merging unrelated scopes. This was flagged previously — fail fast with a throw instead.Additionally,
nextEffectId()on line 657 calls the module-scoped counter (root cause addressed above).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-reducer.ts` around lines 655 - 666, In handleEffect, don't fall back to the string "unknown" for a missing scope (scopeIds map lookup); instead fail fast by throwing a descriptive error when routine.scope isn't found in this.scopeIds to avoid corrupting replay streams; also don't call nextEffectId() before validating the scope—either move the nextEffectId() invocation after the scopeId check or switch to a scope-aware ID generator (i.e., avoid the module-scoped counter) so effectId is only created once a valid scopeId is established.
23-27:⚠️ Potential issue | 🔴 CriticalModule-scoped
globalEffectCountercauses effect ID collisions across runs.The counter resets to 0 on process restart. During replay,
nextEffectId()is still called (line 657) before the replay check, but the generated ID is discarded when overwritten by the replay event's ID (line 681). After replay ends, new live effects get IDs that may overlap with unconsumed replay-stream effect IDs. On a subsequent replay of the merged stream, theReplayIndex.resolutionsmap (keyed byeffectId) will silently overwrite the older resolution with the newer one, corrupting replay.Make this an instance-local counter seeded from the stream's max effect ordinal, as previously suggested.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-reducer.ts` around lines 23 - 27, Replace the module-scoped globalEffectCounter and its nextEffectId() function with an instance-local counter on the reducer/replay owner so effect IDs are seeded from the stream's maximum effect ordinal; e.g., remove globalEffectCounter, add this.effectCounter (or similar) to the reducer/ReplayIndex instance, initialize it when the stream is opened/loaded using the stream's max effect ordinal, and change nextEffectId() to read/increment that instance field (so calls to nextEffectId() during replay use the seeded counter and subsequent live effect IDs cannot collide with existing ReplayIndex.resolutions keyed by effectId).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@durably/durable-reducer.test.ts`:
- Around line 617-648: The test uses yield* sleep(0) to wait for
workflow:return/scope:destroyed which violates the no-sleep policy; fix by
making the synchronization deterministic: either update durably() to guarantee
it emits all lifecycle events (workflow:return and scope:destroyed) before the
returned task settles, or add a deterministic drain/wait helper on
InMemoryDurableStream (e.g., stream.drain() or stream.waitForEvents()) and
replace yield* sleep(0) calls with a call to that helper; adjust the test to
call the new helper after invoking durably() and before reading stream.read() so
events are reliably available.
In `@durably/durable-reducer.ts`:
- Around line 638-653: Extract the hardcoded effect description strings in
isInfrastructureEffect into a shared exported constant (e.g., export const
INFRASTRUCTURE_EFFECTS = [...]) and replace the inline array/checks in
isInfrastructureEffect with a lookup against INFRASTRUCTURE_EFFECTS; add a
top-of-file comment above INFRASTRUCTURE_EFFECTS noting "Hardcoded to match
Effection ^4 internal effect descriptions. Update if Effection adds/renames
infrastructure effects." and update any tests or other files that duplicate
these strings to import and use INFRASTRUCTURE_EFFECTS so all references
(including durable-reducer.ts's isInfrastructureEffect) are centralized.
In `@durably/http.ts`:
- Around line 90-91: The catch block redundantly casts the caught error after
using the instanceof check; inside the catch for "e: unknown" you should rely on
the type-narrowing from "e instanceof FetchError" and replace "(e as
FetchError).status" with "e.status" (within the same catch) so remove the
unnecessary cast and use the narrowed "e" when checking FetchError.status in the
code path that handles 404.
- Around line 65-70: The action currently calls resolve(s) immediately so later
s.errorHandler = reject can never reject the settled action; instead keep the
action unsettled until the stream lifecycle finishes by wiring s.errorHandler to
reject and only resolve the action when the stream is explicitly closed/finished
(i.e. wait for the HttpDurableStream lifecycle end event/callback rather than
resolving synchronously). Concretely, inside the action block set s.errorHandler
= reject first, subscribe to the stream's finish/close signal (or provide a
callback like onClose/onFinish) and call resolve(s) only when the stream has
started successfully and the cleanup returned from the action will
resolve/cleanup when the stream ends; this ensures producer errors propagate via
reject and the action remains unsettled for the stream lifetime.
---
Duplicate comments:
In `@durably/durable-reducer.ts`:
- Around line 655-666: In handleEffect, don't fall back to the string "unknown"
for a missing scope (scopeIds map lookup); instead fail fast by throwing a
descriptive error when routine.scope isn't found in this.scopeIds to avoid
corrupting replay streams; also don't call nextEffectId() before validating the
scope—either move the nextEffectId() invocation after the scopeId check or
switch to a scope-aware ID generator (i.e., avoid the module-scoped counter) so
effectId is only created once a valid scopeId is established.
- Around line 23-27: Replace the module-scoped globalEffectCounter and its
nextEffectId() function with an instance-local counter on the reducer/replay
owner so effect IDs are seeded from the stream's maximum effect ordinal; e.g.,
remove globalEffectCounter, add this.effectCounter (or similar) to the
reducer/ReplayIndex instance, initialize it when the stream is opened/loaded
using the stream's max effect ordinal, and change nextEffectId() to
read/increment that instance field (so calls to nextEffectId() during replay use
the seeded counter and subsequent live effect IDs cannot collide with existing
ReplayIndex.resolutions keyed by effectId).
In `@durably/package.json`:
- Around line 1-50: The package.json is missing a "files" field which causes
tests, configs, and other non-distribution files to be published; add a "files"
array to package.json that explicitly lists the distribution and published
entrypoints (e.g. "dist", "mod.ts", and any source files needed at runtime such
as "http.ts" or other top-level .ts files) so only those artifacts are included
in the npm package and test/config files (*.test.ts, tsconfig.json, etc.) are
excluded.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (6)
durably/README.mddurably/durable-reducer.test.tsdurably/durable-reducer.tsdurably/http-durable-stream.tsdurably/http.tsdurably/package.json
| describe("workflow:return", () => { | ||
| it("emits workflow:return before scope:destroyed for a simple workflow", function* () { | ||
| let stream = new InMemoryDurableStream(); | ||
|
|
||
| yield* durably( | ||
| function* () { | ||
| return 42; | ||
| }, | ||
| { stream }, | ||
| ); | ||
|
|
||
| yield* sleep(0); | ||
|
|
||
| let events = stream.read().map((e) => e.event); | ||
|
|
||
| let workflowReturns = events.filter((e) => e.type === "workflow:return"); | ||
| expect(workflowReturns.length).toBeGreaterThanOrEqual(1); | ||
|
|
||
| let rootReturn = workflowReturns.find( | ||
| (e) => e.type === "workflow:return" && e.scopeId === "root", | ||
| ); | ||
| expect(rootReturn).toBeDefined(); | ||
| if (rootReturn && rootReturn.type === "workflow:return") { | ||
| expect(rootReturn.value).toEqual(42); | ||
| } | ||
|
|
||
| let rootReturnIdx = events.indexOf(rootReturn!); | ||
| let rootDestroyIdx = events.findIndex( | ||
| (e) => e.type === "scope:destroyed" && e.scopeId === "root", | ||
| ); | ||
| expect(rootReturnIdx).toBeLessThan(rootDestroyIdx); | ||
| }); |
There was a problem hiding this comment.
yield* sleep(0) used for test synchronization (lines 628, 664).
Lines 628 and 664 use yield* sleep(0) to wait for the .then() settlement handler in durably.ts to append workflow:return and scope:destroyed events. This violates the no-sleep test sync policy. Consider having durably() guarantee that all lifecycle events are emitted before the task settles, or use a deterministic drain mechanism.
As per coding guidelines: "Tests must not use sleep() for waiting; use deterministic helpers."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/durable-reducer.test.ts` around lines 617 - 648, The test uses yield*
sleep(0) to wait for workflow:return/scope:destroyed which violates the no-sleep
policy; fix by making the synchronization deterministic: either update durably()
to guarantee it emits all lifecycle events (workflow:return and scope:destroyed)
before the returned task settles, or add a deterministic drain/wait helper on
InMemoryDurableStream (e.g., stream.drain() or stream.waitForEvents()) and
replace yield* sleep(0) calls with a call to that helper; adjust the test to
call the new helper after invoking durably() and before reading stream.read() so
events are reliably available.
| let stream = yield* action<HttpDurableStream>((resolve, reject) => { | ||
| let s = new HttpDurableStream(remote, entries); | ||
| s.errorHandler = reject; | ||
| resolve(s); | ||
| return () => {}; | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's understand the file structure and locate Effection usage
find . -type f -name "*.ts" -o -name "*.tsx" | head -20Repository: thefrontside/effectionx
Length of output: 546
🏁 Script executed:
# Search for Effection action usage in the codebase to see patterns
rg "action<" --type ts -A 5 -B 2 | head -100Repository: thefrontside/effectionx
Length of output: 5975
🏁 Script executed:
# Look for any Effection documentation or types in the repo
fd -e md . | xargs grep -l "action" | head -10Repository: thefrontside/effectionx
Length of output: 102
🏁 Script executed:
# Check the actual implementation of useDurableStream to understand full context
cat -n durably/http.ts | sed -n '50,110p'Repository: thefrontside/effectionx
Length of output: 1865
🌐 Web query:
Effection action resolve reject settlement semantics
💡 Result:
In Effection, action() is the direct analog of new Promise((resolve, reject) => ...): it creates an Operation<T> whose result is determined by calling resolve(value) or reject(error). When you yield* the action:
resolve(value)makes the operation complete successfully and producevalue. [1]reject(error)makes the operation fail; from the caller’s perspective, the action throws that error. [1]- Like promises, once it transitions from “pending” to either success or failure, it is settled—further
resolve()/reject()calls have no effect. [2]
Key Effection-specific semantic: the action body runs in its own scope, and that scope is fully torn down before execution returns to the point that yielded the action—so cleanup is expected to happen inside the action (commonly via a finally path). [1] The guide recommends returning a teardown function (e.g., aborting XHR) so work is canceled when the action resolves, rejects, or is otherwise left/canceled. [3]
Sources: Effection action() API reference [1], promise settlement semantics overview [2], Effection actions guide (cleanup/teardown) [3].
[1] (frontside.com)
[2] (janhesters.com)
[3] (frontside.com)
Citations:
- 1: https://frontside.com/effection/api/3.1/action?utm_source=openai
- 2: https://janhesters.com/blog/javascript-promises-explained-but-on-a-senior-level?utm_source=openai
- 3: https://frontside.com/effection/guides/v3/actions/?utm_source=openai
Producer errors will be silently dropped—action settles before reject can be called.
Once resolve(s) is called on line 68, the action is settled. Per Effection's action semantics (which follow Promise settlement), any subsequent reject() calls—including those from s.errorHandler = reject—are no-ops. Since resolve(s) is synchronous and occurs before any async producer error, errors from the stream producer will never propagate into structured concurrency.
To propagate producer errors, refactor to keep the action unsettled until the stream lifecycle ends, or handle producer errors via a different mechanism that is not dependent on post-settlement reject calls.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/http.ts` around lines 65 - 70, The action currently calls resolve(s)
immediately so later s.errorHandler = reject can never reject the settled
action; instead keep the action unsettled until the stream lifecycle finishes by
wiring s.errorHandler to reject and only resolve the action when the stream is
explicitly closed/finished (i.e. wait for the HttpDurableStream lifecycle end
event/callback rather than resolving synchronously). Concretely, inside the
action block set s.errorHandler = reject first, subscribe to the stream's
finish/close signal (or provide a callback like onClose/onFinish) and call
resolve(s) only when the stream has started successfully and the cleanup
returned from the action will resolve/cleanup when the stream ends; this ensures
producer errors propagate via reject and the action remains unsettled for the
stream lifetime.
…le race Move root scope lifecycle emission from .then() microtask in durably.ts into the scope middleware in durable-reducer.ts. This ensures scope:destroyed and workflow:return for root are recorded synchronously during structured teardown, before resource cleanup (e.g., useDurableStream closing the stream). Additionally addresses three CodeRabbit review issues: - Replace module-scoped globalEffectCounter with instance-local counter seeded from stream.length, preventing effect ID collisions after process restart - Change scope ID fallback from silent "unknown" to a descriptive throw, surfacing lifecycle bugs immediately - Add normalizeError() helper applied at 3 callsites (destroy middleware, reduce method, wrappedNext) to handle non-Error throwables New tests: - effect ID collision prevention after simulated restart - non-Error throwable handling (string throws via effects and generators)
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (4)
durably/durable-reducer.ts (3)
384-393: Scope lookup now throws instead of silently falling back to"unknown"— good.This addresses prior review feedback and ensures lifecycle bugs surface immediately.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-reducer.ts` around lines 384 - 393, The getScopeId method now throws when a scope isn't found instead of returning "unknown"; ensure this change is applied in DurableReducer.getScopeId by keeping the lookup via this.scopeIds.get(scope) and throwing a clear Error with the existing message if id is falsy, so callers relying on registered scopes will surface lifecycle bugs immediately (verify references to getScopeId and any callers handle or allow this exception to propagate).
712-727: 🧹 Nitpick | 🔵 TrivialInfrastructure effect strings are hardcoded and tightly coupled to Effection v4 internals.
This was flagged in a prior review as a nitpick. The strings are duplicated across test files (e.g.,
userEventsfilter in the test file at lines 15-16). Extracting them into a shared constant would centralize future Effection version updates.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-reducer.ts` around lines 712 - 727, The hardcoded Effection v4 infrastructure effect strings in isInfrastructureEffect should be extracted to a shared constant (e.g., INFRASTRUCTURE_EFFECTS: string[] exported from a new or existing central module) and the function changed to check INFRASTRUCTURE_EFFECTS.includes(description) instead of repeating literals; update any duplicated test filters (like the userEvents test) to import and reuse that same exported constant so future Effection changes only require a single update to the constant.
362-378: Effect ID collision fix looks correct — addresses prior review feedback.Seeding
effectCounterfromstream.lengthensures new IDs start above any existing entries. The instance-local counter avoids cross-instance collisions.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-reducer.ts` around lines 362 - 378, Seed the instance-local effectCounter from the stream length in the constructor (this.effectCounter = stream.length) so new effect IDs won't collide with existing entries after restart, initialize replayIndex with new ReplayIndex(stream.read(0), desc => this.isInfrastructureEffect(desc)) as shown, and keep nextEffectId() returning `effect-${++this.effectCounter}` to pre-increment the counter when creating IDs; ensure these three symbols (effectCounter, constructor/replayIndex initialization, nextEffectId) match the diff so the change is applied correctly.durably/durable-reducer.test.ts (1)
628-628:⚠️ Potential issue | 🟡 Minor
yield* sleep(0)is used for test synchronization on lines 628 and 664.These
sleep(0)calls wait for lifecycle events to be appended to the stream. The PR description mentions that root lifecycle emission was moved into the scope middleware to be synchronous during teardown. If that fix is complete, thesesleep(0)calls may no longer be necessary — the events should already be in the stream by the timedurably()resolves.If they are still needed, it indicates the lifecycle emission isn't fully synchronous yet, which contradicts the stated fix.
#!/bin/bash # Check if the workflow:return and scope:destroyed for root are emitted # synchronously within the destroy middleware (no .then() or microtask) rg -n "\.then\(|queueMicrotask\|setTimeout\|Promise\.resolve" durably/durable-reducer.ts durably/durably.tsAlso applies to: 664-664
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-reducer.test.ts` at line 628, The test uses yield* sleep(0) to wait for root lifecycle events; either remove these sleeps and assert lifecycle events are present immediately after durably() resolves, or make the lifecycle emission truly synchronous: ensure the scope destroy middleware (and/or durably() resolve path) calls the root lifecycle emission function synchronously (no .then/Promise/queueMicrotask/setTimeout) so that destroy emits scope:destroyed and workflow:return before durably() resolves; update the tests to drop yield* sleep(0) (references: yield* sleep, durably(), scope destroy middleware, root lifecycle emission) or implement the synchronous emission in the destroy path so the sleeps are unnecessary.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@durably/durable-reducer.test.ts`:
- Around line 8-47: The helper userEffectPairs assumes yielded/resolved events
are adjacent and can mis-pair interleaved/concurrent effects; rename
userEffectPairs to sequentialEffectPairs (or add a clear JSDoc comment above it)
and update all references to make the adjacency assumption explicit so callers
don’t reuse it for concurrent workflows; ensure the new name/comment references
the function and its behavior (matching effectId at events[i + 1]) and update
any tests that import or call userEffectPairs accordingly.
In `@durably/durable-reducer.ts`:
- Around line 24-48: The current toJson uses JSON.stringify/parse for plain
objects which silently drops non-serializable nested values; instead, within
toJson for plain objects (where proto === Object.prototype || proto === null)
recursively walk the object's own properties and arrays, calling toJson on each
property value and building a new plain object so non-serializable values get
wrapped by createLiveOnlySentinel rather than dropped; update the branch in
toJson to perform this recursive mapping (and retain the existing Array.isArray
branch and createLiveOnlySentinel fallback for non-plain objects), and
optionally add a short comment explaining this recursion replaces the previous
stringify/parse behavior to preserve non-serializable values as sentinels.
- Around line 822-834: The function isInfrastructureContext hardcodes
Effection-internal context names; extract these magic strings into a single
shared constant (e.g., INFRASTRUCTURE_NAMES or INFRACTION_SET) and replace the
hardcoded checks in isInfrastructureContext with a lookup against that constant,
then use the same constant in isInfrastructureEffect so both functions share the
canonical list; update references to include the "api::" prefix rule
(startsWith) as part of the shared definition or companion predicate to keep
behavior identical.
- Around line 795-818: The monkey-patch of routine.next via wrappedNext can run
multiple times and produce duplicate stream.append and duplicate originalNext
calls; update the logic around routine.next/wrappedNext (references:
routine.next, originalNext, wrappedNext, effect.enter, routine.data.exit,
stream.append) to ensure wrappedNext is idempotent by guarding its body with a
single-call flag (or by atomically restoring routine.next to a no-op before
performing the append/originalNext), and add a brief comment documenting the
single-invocation invariant so future readers know why the guard is required.
- Around line 207-218: The peekScopeCreation function currently recurses when it
finds consumed scope entries; replace that recursion with an iterative loop:
inside peekScopeCreation(), while scopeCreationCursor <
scopeCreationOrder.length, get ev = scopeCreationOrder[scopeCreationCursor]; if
consumedCreations.has(ev.scopeId) then increment scopeCreationCursor and
continue the loop; otherwise return ev; after the loop return undefined. Keep
the same early-return behavior and only mutate scopeCreationCursor and read
scopeCreationOrder/consumedCreations as shown.
---
Duplicate comments:
In `@durably/durable-reducer.test.ts`:
- Line 628: The test uses yield* sleep(0) to wait for root lifecycle events;
either remove these sleeps and assert lifecycle events are present immediately
after durably() resolves, or make the lifecycle emission truly synchronous:
ensure the scope destroy middleware (and/or durably() resolve path) calls the
root lifecycle emission function synchronously (no
.then/Promise/queueMicrotask/setTimeout) so that destroy emits scope:destroyed
and workflow:return before durably() resolves; update the tests to drop yield*
sleep(0) (references: yield* sleep, durably(), scope destroy middleware, root
lifecycle emission) or implement the synchronous emission in the destroy path so
the sleeps are unnecessary.
In `@durably/durable-reducer.ts`:
- Around line 384-393: The getScopeId method now throws when a scope isn't found
instead of returning "unknown"; ensure this change is applied in
DurableReducer.getScopeId by keeping the lookup via this.scopeIds.get(scope) and
throwing a clear Error with the existing message if id is falsy, so callers
relying on registered scopes will surface lifecycle bugs immediately (verify
references to getScopeId and any callers handle or allow this exception to
propagate).
- Around line 712-727: The hardcoded Effection v4 infrastructure effect strings
in isInfrastructureEffect should be extracted to a shared constant (e.g.,
INFRASTRUCTURE_EFFECTS: string[] exported from a new or existing central module)
and the function changed to check INFRASTRUCTURE_EFFECTS.includes(description)
instead of repeating literals; update any duplicated test filters (like the
userEvents test) to import and reuse that same exported constant so future
Effection changes only require a single update to the constant.
- Around line 362-378: Seed the instance-local effectCounter from the stream
length in the constructor (this.effectCounter = stream.length) so new effect IDs
won't collide with existing entries after restart, initialize replayIndex with
new ReplayIndex(stream.read(0), desc => this.isInfrastructureEffect(desc)) as
shown, and keep nextEffectId() returning `effect-${++this.effectCounter}` to
pre-increment the counter when creating IDs; ensure these three symbols
(effectCounter, constructor/replayIndex initialization, nextEffectId) match the
diff so the change is applied correctly.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
durably/durable-reducer.test.tsdurably/durable-reducer.tsdurably/durably.ts
durably/durable-reducer.test.ts
Outdated
| function userEvents(stream: InMemoryDurableStream): DurableEvent[] { | ||
| return stream | ||
| .read() | ||
| .map((e) => e.event) | ||
| .filter((e) => { | ||
| if (e.type === "effect:yielded") { | ||
| let desc = e.description; | ||
| if (desc === "useCoroutine()" || desc.startsWith("do <")) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| }); | ||
| } | ||
|
|
||
| function userEffectPairs( | ||
| stream: InMemoryDurableStream, | ||
| ): Array<[DurableEvent, DurableEvent]> { | ||
| let events = stream.read().map((e) => e.event); | ||
| let pairs: Array<[DurableEvent, DurableEvent]> = []; | ||
| for (let i = 0; i < events.length - 1; i++) { | ||
| let ev = events[i]; | ||
| if (ev.type !== "effect:yielded") continue; | ||
| if ( | ||
| ev.description === "useCoroutine()" || | ||
| ev.description.startsWith("do <") | ||
| ) | ||
| continue; | ||
| let next = events[i + 1]; | ||
| if ( | ||
| next && | ||
| (next.type === "effect:resolved" || next.type === "effect:errored") && | ||
| next.effectId === ev.effectId | ||
| ) { | ||
| pairs.push([ev, next]); | ||
| i++; | ||
| } | ||
| } | ||
| return pairs; | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Helper userEffectPairs assumes yielded/resolved events are adjacent.
userEffectPairs (line 36) looks for the resolution at events[i + 1], which only works for sequential single-scope workflows. If effects interleave (e.g., from concurrent spawns), pairs may be missed or mismatched. This is fine for current usage but could silently produce wrong results if reused for concurrent workflow tests.
A brief comment or renaming to sequentialEffectPairs would prevent misuse.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/durable-reducer.test.ts` around lines 8 - 47, The helper
userEffectPairs assumes yielded/resolved events are adjacent and can mis-pair
interleaved/concurrent effects; rename userEffectPairs to sequentialEffectPairs
(or add a clear JSDoc comment above it) and update all references to make the
adjacency assumption explicit so callers don’t reuse it for concurrent
workflows; ensure the new name/comment references the function and its behavior
(matching effectId at events[i + 1]) and update any tests that import or call
userEffectPairs accordingly.
durably/durable-reducer.ts
Outdated
| let originalNext = routine.next.bind(routine); | ||
| let stream = this.stream; | ||
|
|
||
| let wrappedNext = (result: Result<unknown>) => { | ||
| if (result.ok) { | ||
| stream.append({ | ||
| type: "effect:resolved", | ||
| effectId, | ||
| value: toJson(result.value), | ||
| }); | ||
| } else { | ||
| stream.append({ | ||
| type: "effect:errored", | ||
| effectId, | ||
| error: serializeError(normalizeError(result.error)), | ||
| }); | ||
| } | ||
|
|
||
| routine.next = originalNext; | ||
| originalNext(result); | ||
| }; | ||
|
|
||
| routine.next = wrappedNext; | ||
| routine.data.exit = effect.enter(routine.next, routine); |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Monkey-patching routine.next is fragile — document the single-call invariant.
The wrappedNext closure replaces routine.next and restores originalNext after the first call. If wrappedNext were ever invoked more than once (e.g., due to a buggy effect calling the resolve callback twice), originalNext would be called twice and routine.next would already be restored — but a duplicate stream.append would still occur, corrupting the stream.
A guard or a brief comment documenting that Effection guarantees single-invocation of the resolve callback would strengthen confidence.
🛡️ Optional guard
+ let recorded = false;
let wrappedNext = (result: Result<unknown>) => {
+ if (recorded) return; // guard against double resolution
+ recorded = true;
if (result.ok) {
stream.append({🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/durable-reducer.ts` around lines 795 - 818, The monkey-patch of
routine.next via wrappedNext can run multiple times and produce duplicate
stream.append and duplicate originalNext calls; update the logic around
routine.next/wrappedNext (references: routine.next, originalNext, wrappedNext,
effect.enter, routine.data.exit, stream.append) to ensure wrappedNext is
idempotent by guarding its body with a single-call flag (or by atomically
restoring routine.next to a no-op before performing the append/originalNext),
and add a brief comment documenting the single-invocation invariant so future
readers know why the guard is required.
Shared test infrastructure: - Extract allEvents, scopeEvents, userEvents, userEffectPairs into shared test-helpers.ts; remove duplicated helpers from 6 test files - Remove dead code: unused userFacingEvents, userEffects, userEvents Infrastructure constants: - Extract INFRASTRUCTURE_EFFECTS and INFRASTRUCTURE_CONTEXTS arrays from hardcoded string checks; document Effection ^4 version coupling - isInfrastructureEffect/Context now use array lookups Serialization improvements: - Add cause?: SerializedError to SerializedError interface for chained error preservation during replay - Replace toJson JSON.stringify/parse with recursive property walk so non-serializable nested values become LiveOnlySentinel markers instead of being silently dropped - Add WeakSet cycle detection in toJson() and serializeError() to prevent stack overflow on circular object/error graphs Code quality: - Convert peekScopeCreation() from recursion to while loop - Add RangeError validation in stream.read() for negative/non-integer fromOffset - Remove redundant (e as FetchError) cast in http.ts after instanceof - Remove DurableReducer/toJson from mod.ts public API surface - Add files field to package.json to exclude test files from npm - Document wrappedNext single-invocation invariant (Effection effect protocol guarantees) Test synchronization: - Remove yield* sleep(0) from root lifecycle test (now synchronous in scope middleware); keep sleep(0) for spawned child workflow:return with comment explaining async teardown ordering - Add comments to intentional sleep(10) calls in abort signal tests Documentation: - Explain synchronous DurableStream design choice in README - Expand Effection PR 1127 requirement with installation guidance
There was a problem hiding this comment.
Actionable comments posted: 8
♻️ Duplicate comments (3)
durably/http.ts (1)
65-70:⚠️ Potential issue | 🟠 MajorProducer errors are silently dropped after
actionsettles.Once
resolve(s)is called on Line 68, the action is settled. Any subsequent call torejectvias.errorHandler(triggered by a producer write failure) will be a no-op per Effection's action settlement semantics. Producer errors will not propagate into structured concurrency.Consider keeping the action unsettled until the stream lifecycle ends, or spawn a separate task that rejects on producer error to propagate failures into the enclosing scope.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/http.ts` around lines 65 - 70, The action currently calls resolve(s) immediately so subsequent producer failures routed to s.errorHandler are ignored; change the behavior in the action<HttpDurableStream> callback so the action stays unsettled until the stream lifecycle completes: set s.errorHandler to reject and only call resolve(s) after you also arrange for a teardown/close signal (or alternatively spawn a child task that listens for producer write failures and calls reject on failure), and ensure the returned cleanup function from the action resolves/cleans up the stream when it finishes so producer errors propagate into the enclosing Effection scope; key symbols: HttpDurableStream, action<HttpDurableStream>, s.errorHandler, resolve, reject.durably/durable-errors.test.ts (1)
813-845:sleep(10)for suspend-to-halt timing — developer has documented the rationale.The inline comments explain why
sleep(10)is intentional here: it tests the real async window between a task reachingsuspend()and an externalhalt(). This is a valid use case where deterministic handshakes would alter the lifecycle semantics under test.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-errors.test.ts` around lines 813 - 845, This review is approving the intentional use of sleep(10) to test the async window between suspend() and an external halt() during replay-to-live resume; leave the sleep(10) calls and accompanying explanatory comments in place (referencing the test helpers sleep, suspend, durably, useAbortSignal, and InMemoryDurableStream) and do not replace them with deterministic handshakes or remove the rationale comment.durably/durable-reducer.test.ts (1)
624-628:sleep(0)for child taskworkflow:return— documented limitation.The comment on lines 624-626 explains that spawned children's
workflow:returnevents are emitted during structured teardown which requires a microtask tick, unlike root lifecycle events which are now synchronous. This is a known limitation.While this technically violates the no-sleep test sync policy, the explanation is clear and this appears to be an inherent timing characteristic of Effection's structured teardown for child scopes. Consider whether
durably()could flush child lifecycle events synchronously in a future iteration.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-reducer.test.ts` around lines 624 - 628, The test uses yield* sleep(0) to wait for child task 'workflow:return' events because structured teardown currently emits those asynchronously; to remove this fragile sleep, update the durably() teardown path so child scopes synchronously flush their lifecycle events (emit 'workflow:return') before the parent teardown completes—i.e., in the durably() implementation ensure child scope destruction synchronously dispatches lifecycle events (or synchronously drains any microtask queue relevant to child scopes) so tests no longer need sleep(0) to observe 'workflow:return'.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@durably/durable-errors.test.ts`:
- Around line 185-199: The test relies on an explicit microtask drain (yield*
sleep(0)) after the until(Promise.reject(...)) catch to allow the structured
teardown middleware to asynchronously emit the "scope:destroyed" event before
assertions; add a concise comment immediately above the yield* sleep(0)
explaining that this sleep is intentionally used to flush the microtask queue so
the scope middleware can emit scope:destroyed (and reference the related
assertions using partialStream.read(), rootDestroyed, and firstEntered), or if
you prefer an alternative, make the scope teardown/event emission in the
middleware synchronous so the sleep is not required.
In `@durably/durable-reducer.test.ts`:
- Around line 1096-1104: Replace the explicit cast on
(recordedEvents[firstYielded] as { effectId: string }).effectId with a proper
narrowing guard: after checking firstYielded >= 0 and
recordedEvents[firstYielded].type === "effect:yielded", assign
recordedEvents[firstYielded] to a local const (e.g., yielded =
recordedEvents[firstYielded]) and use yielded.effectId in the comparison;
alternatively extract a type‑guard function for the union so the compiler knows
the element has effectId, updating the firstResolved predicate to rely on that
narrowed value instead of the cast.
In `@durably/durable-reducer.ts`:
- Around line 774-777: The isInfrastructureEffect method should use a Set for
membership checks instead of Array.includes to make lookups O(1); create a Set
from INFRASTRUCTURE_EFFECTS (e.g., INFRASTRUCTURE_EFFECTS_SET = new
Set(INFRASTRUCTURE_EFFECTS)) and replace
INFRASTRUCTURE_EFFECTS.includes(description) with
INFRASTRUCTURE_EFFECTS_SET.has(description) while keeping the
description.startsWith("do <") short-circuit; update any imports/exports if you
introduce a new constant name.
In `@durably/durable-scope.test.ts`:
- Line 38: The tests currently use yield* sleep(0) after durably() to wait for
root scope destruction; replace this flaky microtask-yield with a deterministic
approach by either (A) updating durably() to guarantee its returned stream/task
has emitted all lifecycle events before settling, or (B) add a test helper
(e.g., waitForScopeDestroyed or readUntilLifecycleEvent) that consumes the
durably() stream until it sees a "scope:destroyed" event for the root scope and
only then returns; change the tests to call that helper instead of yield*
sleep(0), and ensure the helper references the same lifecycle event name and
root scope id extraction used elsewhere in the test code so the assertions can
run deterministically.
- Around line 73-80: The failing assertions use short-circuit boolean
expressions inside expect which yield misleading failures; instead, follow the
existing narrowing+assertion pattern: assert the event type explicitly for
created[0] and for lastDestroyed (computed from destroyed[destroyed.length - 1])
using two steps—first expect the .type to equal
"scope:created"/"scope:destroyed" (or use a truthy check + message), then
extract the .scopeId into a variable and expect that variable toEqual("root");
update the checks that reference created[0] and lastDestroyed accordingly to
mirror the pattern used earlier in this test file.
In `@durably/README.md`:
- Around line 233-239: The fenced code block in the "Effection ^4" list item
(the block containing "npm install
effection@https://pkg.pr.new/thefrontside/effection@1127") needs Markdown lint
fixes: add a blank line before and after the fenced block within the list item,
and add a language specifier (use "bash") on the opening fence (i.e., change ```
to ```bash) so the block is properly fenced and spaced relative to the
surrounding text in README.md.
In `@durably/test-helpers.ts`:
- Around line 30-43: The userEvents helper filters out only "useCoroutine()" and
"do <" effects but should match the broader infra filtering used by
userFacingEffects (and be consistent with userEffectPairs); update userEvents
(function name userEvents) to reuse or apply the same INFRASTRUCTURE_EFFECTS
filter logic (or call the existing userFacingEffects-filtering helper) so it
excludes the full set of infrastructure effects rather than just the two
strings; ensure you reference INFRASTRUCTURE_EFFECTS (or the shared filter
function) and adjust imports/usages accordingly.
- Around line 71-95: The userEffectPairs helper is using a narrower filter than
userFacingEffects and so can include infrastructure effects; update
userEffectPairs (the function named userEffectPairs) to exclude the same set of
descriptions as userFacingEffects by checking against INFRASTRUCTURE_EFFECTS and
also skipping descriptions that start with "do <" (and continue to skip
"useCoroutine()"), so that only user-facing effect pairs are collected; use the
same predicate logic/reference to INFRASTRUCTURE_EFFECTS as in userFacingEffects
to ensure consistent behavior.
---
Duplicate comments:
In `@durably/durable-errors.test.ts`:
- Around line 813-845: This review is approving the intentional use of sleep(10)
to test the async window between suspend() and an external halt() during
replay-to-live resume; leave the sleep(10) calls and accompanying explanatory
comments in place (referencing the test helpers sleep, suspend, durably,
useAbortSignal, and InMemoryDurableStream) and do not replace them with
deterministic handshakes or remove the rationale comment.
In `@durably/durable-reducer.test.ts`:
- Around line 624-628: The test uses yield* sleep(0) to wait for child task
'workflow:return' events because structured teardown currently emits those
asynchronously; to remove this fragile sleep, update the durably() teardown path
so child scopes synchronously flush their lifecycle events (emit
'workflow:return') before the parent teardown completes—i.e., in the durably()
implementation ensure child scope destruction synchronously dispatches lifecycle
events (or synchronously drains any microtask queue relevant to child scopes) so
tests no longer need sleep(0) to observe 'workflow:return'.
In `@durably/http.ts`:
- Around line 65-70: The action currently calls resolve(s) immediately so
subsequent producer failures routed to s.errorHandler are ignored; change the
behavior in the action<HttpDurableStream> callback so the action stays unsettled
until the stream lifecycle completes: set s.errorHandler to reject and only call
resolve(s) after you also arrange for a teardown/close signal (or alternatively
spawn a child task that listens for producer write failures and calls reject on
failure), and ensure the returned cleanup function from the action
resolves/cleans up the stream when it finishes so producer errors propagate into
the enclosing Effection scope; key symbols: HttpDurableStream,
action<HttpDurableStream>, s.errorHandler, resolve, reject.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (14)
durably/README.mddurably/durable-all-race.test.tsdurably/durable-each.test.tsdurably/durable-errors.test.tsdurably/durable-reducer.test.tsdurably/durable-reducer.tsdurably/durable-resource.test.tsdurably/durable-scope.test.tsdurably/http.tsdurably/mod.tsdurably/package.jsondurably/stream.tsdurably/test-helpers.tsdurably/types.ts
durably/durable-scope.test.ts
Outdated
| { stream }, | ||
| ); | ||
|
|
||
| yield* sleep(0); |
There was a problem hiding this comment.
sleep(0) used for test synchronization to observe root scope destruction.
Lines 38, 63, 126, and 289 use yield* sleep(0) after durably() completes to ensure root scope destruction events are recorded before assertions. While this is a minimal yield point (not timing-dependent), it relies on microtask ordering. As per coding guidelines, tests should avoid sleep() for waiting; consider a deterministic approach (e.g., having durably() guarantee the stream contains all lifecycle events before the task settles, or providing a helper that reads until scope:destroyed for root is present).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/durable-scope.test.ts` at line 38, The tests currently use yield*
sleep(0) after durably() to wait for root scope destruction; replace this flaky
microtask-yield with a deterministic approach by either (A) updating durably()
to guarantee its returned stream/task has emitted all lifecycle events before
settling, or (B) add a test helper (e.g., waitForScopeDestroyed or
readUntilLifecycleEvent) that consumes the durably() stream until it sees a
"scope:destroyed" event for the root scope and only then returns; change the
tests to call that helper instead of yield* sleep(0), and ensure the helper
references the same lifecycle event name and root scope id extraction used
elsewhere in the test code so the assertions can run deterministically.
| export function userEvents(stream: InMemoryDurableStream): DurableEvent[] { | ||
| return stream | ||
| .read() | ||
| .map((e) => e.event) | ||
| .filter((e) => { | ||
| if (e.type === "effect:yielded") { | ||
| let desc = e.description; | ||
| if (desc === "useCoroutine()" || desc.startsWith("do <")) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| }); | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
userEvents also uses a narrower filter than userFacingEffects.
Same concern as userEffectPairs: userEvents only filters "useCoroutine()" and "do <" while userFacingEffects filters the full INFRASTRUCTURE_EFFECTS list. If both are intended to represent "user-facing" events, consider consolidating the filter logic or clearly documenting the difference in scope between these helpers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/test-helpers.ts` around lines 30 - 43, The userEvents helper filters
out only "useCoroutine()" and "do <" effects but should match the broader infra
filtering used by userFacingEffects (and be consistent with userEffectPairs);
update userEvents (function name userEvents) to reuse or apply the same
INFRASTRUCTURE_EFFECTS filter logic (or call the existing
userFacingEffects-filtering helper) so it excludes the full set of
infrastructure effects rather than just the two strings; ensure you reference
INFRASTRUCTURE_EFFECTS (or the shared filter function) and adjust imports/usages
accordingly.
| export function userEffectPairs( | ||
| stream: InMemoryDurableStream, | ||
| ): Array<[DurableEvent, DurableEvent]> { | ||
| let events = stream.read().map((e) => e.event); | ||
| let pairs: Array<[DurableEvent, DurableEvent]> = []; | ||
| for (let i = 0; i < events.length - 1; i++) { | ||
| let ev = events[i]; | ||
| if (ev.type !== "effect:yielded") continue; | ||
| if ( | ||
| ev.description === "useCoroutine()" || | ||
| ev.description.startsWith("do <") | ||
| ) | ||
| continue; | ||
| let next = events[i + 1]; | ||
| if ( | ||
| next && | ||
| (next.type === "effect:resolved" || next.type === "effect:errored") && | ||
| next.effectId === ev.effectId | ||
| ) { | ||
| pairs.push([ev, next]); | ||
| i++; | ||
| } | ||
| } | ||
| return pairs; | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
userEffectPairs uses a narrower filter than userFacingEffects, potentially including infrastructure pairs.
userFacingEffects (Line 57) excludes all entries in INFRASTRUCTURE_EFFECTS plus "do <" patterns, but userEffectPairs (Lines 80-83) only skips "useCoroutine()" and "do <". This means infrastructure effects like "trap return", "await resource", "await future", etc. would be included as "user" pairs. If these are test helpers, the inconsistency could lead to confusing test assertions.
Consider aligning the filter logic:
Proposed fix
if (ev.type !== "effect:yielded") continue;
if (
- ev.description === "useCoroutine()" ||
- ev.description.startsWith("do <")
+ INFRASTRUCTURE_EFFECTS.includes(ev.description) ||
+ ev.description.startsWith("do <")
)
continue;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export function userEffectPairs( | |
| stream: InMemoryDurableStream, | |
| ): Array<[DurableEvent, DurableEvent]> { | |
| let events = stream.read().map((e) => e.event); | |
| let pairs: Array<[DurableEvent, DurableEvent]> = []; | |
| for (let i = 0; i < events.length - 1; i++) { | |
| let ev = events[i]; | |
| if (ev.type !== "effect:yielded") continue; | |
| if ( | |
| ev.description === "useCoroutine()" || | |
| ev.description.startsWith("do <") | |
| ) | |
| continue; | |
| let next = events[i + 1]; | |
| if ( | |
| next && | |
| (next.type === "effect:resolved" || next.type === "effect:errored") && | |
| next.effectId === ev.effectId | |
| ) { | |
| pairs.push([ev, next]); | |
| i++; | |
| } | |
| } | |
| return pairs; | |
| } | |
| export function userEffectPairs( | |
| stream: InMemoryDurableStream, | |
| ): Array<[DurableEvent, DurableEvent]> { | |
| let events = stream.read().map((e) => e.event); | |
| let pairs: Array<[DurableEvent, DurableEvent]> = []; | |
| for (let i = 0; i < events.length - 1; i++) { | |
| let ev = events[i]; | |
| if (ev.type !== "effect:yielded") continue; | |
| if ( | |
| INFRASTRUCTURE_EFFECTS.includes(ev.description) || | |
| ev.description.startsWith("do <") | |
| ) | |
| continue; | |
| let next = events[i + 1]; | |
| if ( | |
| next && | |
| (next.type === "effect:resolved" || next.type === "effect:errored") && | |
| next.effectId === ev.effectId | |
| ) { | |
| pairs.push([ev, next]); | |
| i++; | |
| } | |
| } | |
| return pairs; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/test-helpers.ts` around lines 71 - 95, The userEffectPairs helper is
using a narrower filter than userFacingEffects and so can include infrastructure
effects; update userEffectPairs (the function named userEffectPairs) to exclude
the same set of descriptions as userFacingEffects by checking against
INFRASTRUCTURE_EFFECTS and also skipping descriptions that start with "do <"
(and continue to skip "useCoroutine()"), so that only user-facing effect pairs
are collected; use the same predicate logic/reference to INFRASTRUCTURE_EFFECTS
as in userFacingEffects to ensure consistent behavior.
- Convert INFRASTRUCTURE_EFFECTS/CONTEXTS from arrays to ReadonlySet with .has() for O(1) membership lookups - Fix type narrowing in effect ID collision test: replace unsafe cast with explicit type guard and local variable - Add explanatory comments to all remaining sleep(0) calls in tests documenting why each is needed (child scope async teardown, microtask flush for scope:destroyed after error path) - Remove sleep(0) from simple root lifecycle test (now synchronous) - Fix misleading short-circuit boolean assertions in scope test: split into explicit type check + value assertion - Fix README markdown lint: add bash language specifier and blank lines around fenced code block
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (6)
durably/durable-scope.test.ts (2)
63-65:sleep(0)for test synchronization in scope lifecycle tests.Lines 65, 132, and 297 use
yield* sleep(0)to flush microtask queues for child scope lifecycle event emission. Comments were added to explain the rationale, which is good. However, this still violates the no-sleep test sync policy. Consider whetherdurably()could guarantee all lifecycle events are emitted before the task settles for spawned children as well, eliminating the need for these workarounds.As per coding guidelines,
.policies/no-sleep-test-sync.md- Tests must not usesleep()for waiting; use deterministic helpers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-scope.test.ts` around lines 63 - 65, The test uses yield* sleep(0) (calls to sleep) to flush microtasks for child scope lifecycle events; replace this nondeterministic wait by either updating durably() to guarantee that all child scope lifecycle events are emitted before the parent task is considered settled (so tests don't need to tick the event loop), or swap the sleep(0) calls in durable-scope.test.ts for a deterministic test helper (e.g., await waitForLifecycleEvents(childScope) or a provided synchronizeChildLifecycle() utility) that explicitly waits for the specific lifecycle signals; locate uses of sleep and the test helpers around the structured teardown and modify durably() or add a helper so spawned children’s lifecycle events are deterministically observable without sleep().
104-107:⚠️ Potential issue | 🟡 MinorShort-circuit evaluation inside
expect()yields confusing failures.If
root.typeis not"scope:created", this evaluates toexpect(false).toBeUndefined(), producing a misleading error message. Use the narrowing guard + assertion pattern used elsewhere in this file.Proposed fix
- expect( - root.type === "scope:created" && root.parentScopeId, - ).toBeUndefined(); + expect(root.type).toEqual("scope:created"); + if (root.type === "scope:created") { + expect(root.parentScopeId).toBeUndefined(); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-scope.test.ts` around lines 104 - 107, The test uses a short-circuit expression inside expect which can produce misleading failures; replace the combined check with the same narrowing-guard + assertion pattern used elsewhere: first retrieve scopeCreatedEvents[0] into root, assert (or expect) that root.type === "scope:created" to narrow the type, then separately assert that root.parentScopeId is undefined (e.g., expect(root.parentScopeId).toBeUndefined()). Update the assertions around the root variable (scopeCreatedEvents, root) accordingly so each condition is tested independently.durably/test-helpers.ts (2)
30-43: 🧹 Nitpick | 🔵 Trivial
userEventsuses a narrower filter thanuserFacingEffects.
userEventsfilters only"useCoroutine()"and"do <"whileuserFacingEffects(line 49) filters allINFRASTRUCTURE_EFFECTS. This means events like"trap return","await resource", etc. pass throughuserEventsbut notuserFacingEffects. If both are meant to represent "user-facing" events, the inconsistency could silently include infrastructure events in test assertions.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/test-helpers.ts` around lines 30 - 43, The filter in userEvents is too narrow compared to userFacingEffects; update userEvents (the function that maps stream.read() -> .map(e => e.event) -> .filter(...)) to exclude the same INFRASTRUCTURE_EFFECTS as userFacingEffects (instead of only checking "useCoroutine()" and "do <"), i.e., import or reference INFRASTRUCTURE_EFFECTS and filter out events whose description matches any entry in that set (and keep the existing checks like startsWith("do <")/useCoroutine() if userFacingEffects does), so both helpers return the same set of user-facing events.
79-83: 🧹 Nitpick | 🔵 Trivial
userEffectPairsuses a narrower filter thanuserFacingEffects.Same inconsistency: only
"useCoroutine()"and"do <"are filtered here, whileuserFacingEffectsfilters the fullINFRASTRUCTURE_EFFECTSset. Infrastructure effects like"trap return"could be included as "user" pairs.Proposed fix
if (ev.type !== "effect:yielded") continue; if ( - ev.description === "useCoroutine()" || - ev.description.startsWith("do <") + INFRASTRUCTURE_EFFECTS.has(ev.description) || + ev.description.startsWith("do <") ) continue;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/test-helpers.ts` around lines 79 - 83, The userEffectPairs filter in durably/test-helpers.ts only excludes "useCoroutine()" and "do <" (checking ev.description) but should exclude the full INFRASTRUCTURE_EFFECTS set like userFacingEffects does; update the filtering logic in the userEffectPairs construction to reuse the same predicate or check INFRASTRUCTURE_EFFECTS (or call the same helper used by userFacingEffects) so any description contained in INFRASTRUCTURE_EFFECTS (e.g., "trap return") is excluded consistently.durably/durable-reducer.test.ts (1)
624-627:sleep(0)for child taskworkflow:return— same concern as scope test.The comment explains that spawned children's lifecycle events need a tick to settle. Same observation as in
durable-scope.test.ts— this is the minimal possible yield but still technically violates the no-sleep policy.As per coding guidelines,
.policies/no-sleep-test-sync.md- Tests must not usesleep()for waiting; use deterministic helpers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-reducer.test.ts` around lines 624 - 627, Replace the yield* sleep(0) used to allow spawned child "workflow:return" events to settle with the deterministic test helper used in durable-scope.test.ts (do not use sleep). Locate the yield* sleep(0) call in durable-reducer.test.ts and swap it for the microtask-tick/settle helper used in the other test (the same function used to advance child lifecycle events in durable-scope.test.ts) so child workflow:return emissions are deterministically awaited without using sleep.durably/durable-reducer.ts (1)
845-873: 🧹 Nitpick | 🔵 TrivialPotential double-recording if
wrappedNextis called more than once.The comment at lines 848–851 documents the single-invocation invariant from Effection's effect protocol. While this invariant should hold, a defensive one-shot guard would prevent stream corruption if it were ever violated (e.g., by a buggy user-provided effect or a future Effection change).
Optional defensive guard
+ let recorded = false; let wrappedNext = (result: Result<unknown>) => { + if (recorded) { + originalNext(result); + return; + } + recorded = true; if (result.ok) { stream.append({🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@durably/durable-reducer.ts` around lines 845 - 873, Add a defensive one-shot guard to wrappedNext to prevent double-recording: introduce a local flag (e.g., called or handled) captured by wrappedNext and check it at the top; if already true, return immediately; otherwise set it true, perform the stream.append calls and restore routine.next to originalNext, then call originalNext(result). Update references around wrappedNext, routine.next, originalNext, stream.append, and routine.data.exit so the wrapper still gets installed and effect.enter invoked the same way but with the new idempotent wrappedNext to avoid duplicate stream events if wrappedNext is invoked multiple times.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@durably/durable-reducer.test.ts`:
- Around line 298-316: The test's cut-index loop uses a hard-coded filter on
yielded.description (checking for "useCoroutine()" and "do <") which is narrower
than the shared INFRASTRUCTURE_EFFECTS; update the logic in the loop that
computes cutIndex to import and use the INFRASTRUCTURE_EFFECTS set from
durable-reducer.ts (or the shared test helper) and skip any yielded.description
that matches an entry in INFRASTRUCTURE_EFFECTS instead of the current inline
checks, keeping the surrounding loop, cutIndex assignment, and use of allEvents
the same.
In `@durably/durable-reducer.ts`:
- Around line 617-621: Add a short inline comment above the dual-check that uses
reducer.replayIndex.hasScopeEffects(scopeId) and
reducer.replayIndex.hasScopeDestruction(scopeId) clarifying that the combined
negation (!hasScopeEffects && !hasScopeDestruction) indicates the scope has no
remaining replay events or lifecycle destruction to consume and therefore is now
in live mode, and note that this check is only applied when scopeId exists and
the context is not an infrastructure context
(isInfrastructureContext(context.name)).
---
Duplicate comments:
In `@durably/durable-reducer.test.ts`:
- Around line 624-627: Replace the yield* sleep(0) used to allow spawned child
"workflow:return" events to settle with the deterministic test helper used in
durable-scope.test.ts (do not use sleep). Locate the yield* sleep(0) call in
durable-reducer.test.ts and swap it for the microtask-tick/settle helper used in
the other test (the same function used to advance child lifecycle events in
durable-scope.test.ts) so child workflow:return emissions are deterministically
awaited without using sleep.
In `@durably/durable-reducer.ts`:
- Around line 845-873: Add a defensive one-shot guard to wrappedNext to prevent
double-recording: introduce a local flag (e.g., called or handled) captured by
wrappedNext and check it at the top; if already true, return immediately;
otherwise set it true, perform the stream.append calls and restore routine.next
to originalNext, then call originalNext(result). Update references around
wrappedNext, routine.next, originalNext, stream.append, and routine.data.exit so
the wrapper still gets installed and effect.enter invoked the same way but with
the new idempotent wrappedNext to avoid duplicate stream events if wrappedNext
is invoked multiple times.
In `@durably/durable-scope.test.ts`:
- Around line 63-65: The test uses yield* sleep(0) (calls to sleep) to flush
microtasks for child scope lifecycle events; replace this nondeterministic wait
by either updating durably() to guarantee that all child scope lifecycle events
are emitted before the parent task is considered settled (so tests don't need to
tick the event loop), or swap the sleep(0) calls in durable-scope.test.ts for a
deterministic test helper (e.g., await waitForLifecycleEvents(childScope) or a
provided synchronizeChildLifecycle() utility) that explicitly waits for the
specific lifecycle signals; locate uses of sleep and the test helpers around the
structured teardown and modify durably() or add a helper so spawned children’s
lifecycle events are deterministically observable without sleep().
- Around line 104-107: The test uses a short-circuit expression inside expect
which can produce misleading failures; replace the combined check with the same
narrowing-guard + assertion pattern used elsewhere: first retrieve
scopeCreatedEvents[0] into root, assert (or expect) that root.type ===
"scope:created" to narrow the type, then separately assert that
root.parentScopeId is undefined (e.g.,
expect(root.parentScopeId).toBeUndefined()). Update the assertions around the
root variable (scopeCreatedEvents, root) accordingly so each condition is tested
independently.
In `@durably/test-helpers.ts`:
- Around line 30-43: The filter in userEvents is too narrow compared to
userFacingEffects; update userEvents (the function that maps stream.read() ->
.map(e => e.event) -> .filter(...)) to exclude the same INFRASTRUCTURE_EFFECTS
as userFacingEffects (instead of only checking "useCoroutine()" and "do <"),
i.e., import or reference INFRASTRUCTURE_EFFECTS and filter out events whose
description matches any entry in that set (and keep the existing checks like
startsWith("do <")/useCoroutine() if userFacingEffects does), so both helpers
return the same set of user-facing events.
- Around line 79-83: The userEffectPairs filter in durably/test-helpers.ts only
excludes "useCoroutine()" and "do <" (checking ev.description) but should
exclude the full INFRASTRUCTURE_EFFECTS set like userFacingEffects does; update
the filtering logic in the userEffectPairs construction to reuse the same
predicate or check INFRASTRUCTURE_EFFECTS (or call the same helper used by
userFacingEffects) so any description contained in INFRASTRUCTURE_EFFECTS (e.g.,
"trap return") is excluded consistently.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (6)
durably/README.mddurably/durable-errors.test.tsdurably/durable-reducer.test.tsdurably/durable-reducer.tsdurably/durable-scope.test.tsdurably/test-helpers.ts
| let allEvents = recordStream.read().map((e) => e.event); | ||
|
|
||
| let cutIndex = -1; | ||
| for (let i = 0; i < allEvents.length; i++) { | ||
| let ev = allEvents[i]; | ||
| if (ev.type === "effect:resolved") { | ||
| if (i > 0 && allEvents[i - 1].type === "effect:yielded") { | ||
| let yielded = allEvents[i - 1]; | ||
| if ( | ||
| yielded.type === "effect:yielded" && | ||
| yielded.description !== "useCoroutine()" && | ||
| !yielded.description.startsWith("do <") | ||
| ) { | ||
| cutIndex = i + 1; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Cut-index logic uses narrower infra filter than INFRASTRUCTURE_EFFECTS.
The inline filter at lines 308–309 only skips "useCoroutine()" and "do <" patterns. Consider using the INFRASTRUCTURE_EFFECTS set from durable-reducer.ts for consistency (same concern as userEffectPairs in test-helpers).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/durable-reducer.test.ts` around lines 298 - 316, The test's cut-index
loop uses a hard-coded filter on yielded.description (checking for
"useCoroutine()" and "do <") which is narrower than the shared
INFRASTRUCTURE_EFFECTS; update the logic in the loop that computes cutIndex to
import and use the INFRASTRUCTURE_EFFECTS set from durable-reducer.ts (or the
shared test helper) and skip any yielded.description that matches an entry in
INFRASTRUCTURE_EFFECTS instead of the current inline checks, keeping the
surrounding loop, cutIndex assignment, and use of allEvents the same.
| if (scopeId && !isInfrastructureContext(context.name)) { | ||
| // Only record in live mode (not during replay of this scope) | ||
| if ( | ||
| !reducer.replayIndex.hasScopeEffects(scopeId) && | ||
| !reducer.replayIndex.hasScopeDestruction(scopeId) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Clarify the replay-mode detection logic for context recording.
The condition !hasScopeEffects(scopeId) && !hasScopeDestruction(scopeId) correctly identifies when a scope has transitioned to live mode (no remaining replay events or lifecycle to consume). A brief inline comment would help future readers understand this dual check, since it's not immediately obvious that it means "scope is in live mode."
Suggested comment
if (scopeId && !isInfrastructureContext(context.name)) {
- // Only record in live mode (not during replay of this scope)
+ // Only record in live mode: skip if this scope still has
+ // replay effects or a recorded destruction to consume.
if (🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@durably/durable-reducer.ts` around lines 617 - 621, Add a short inline
comment above the dual-check that uses
reducer.replayIndex.hasScopeEffects(scopeId) and
reducer.replayIndex.hasScopeDestruction(scopeId) clarifying that the combined
negation (!hasScopeEffects && !hasScopeDestruction) indicates the scope has no
remaining replay events or lifecycle destruction to consume and therefore is now
in live mode, and note that this check is only applied when scopeId exists and
the context is not an infrastructure context
(isInfrastructureContext(context.name)).
…ibility createScope() in effection 4.1 returns a Scope object with Symbol.iterator (for iterable destructuring), not an actual array. Numeric indexing (createScope()[0]) returns undefined. Use destructuring instead: let [rootScope] = createScope().
- Wrap effect.enter() in try/catch so routine.next is restored to the original if enter() throws synchronously, preventing the coroutine from being left with a stale recording wrapper. - Clean up scopeParents Map entries in unregisterScope() to prevent memory leaks in long-lived DurableReducer instances. The WeakMap for scopeIds already handles GC, but scopeParents (keyed by string IDs) was accumulating entries indefinitely.
Motivation
Effection workflows are ephemeral — when a process restarts, all in-flight operations are lost.
@effectionx/durablysolves this by recording every effect resolution to a durable stream, enabling mid-workflow resume after restarts without re-executing completed effects.This package extracts the DurableReducer from the Effection fork into a standalone community package, proving that durable execution is completely additive to Effection — no fork required, just a peer dependency on
effectionplus this package.Depends on: Effection PR #1127 — exports minimal reducer internals via
effection/experimental(15 lines added to Effection, zero behavior changes).Approach
durably()entry pointWraps any Effection operation with durable execution semantics. Injects a
DurableReducervia Effection'sReducerContext, which intercepts the single point where alleffect.enter()calls happen:effect:yieldedandeffect:resolvedevents to the stream. Scope lifecycle events (scope:created,scope:destroyed) captured viaApi.Scopemiddleware.iterator.next(storedResult)without callingeffect.enter(). The generator cannot tell whether it is replaying or running live.DivergenceError.What's included
durably.tsdurable-reducer.tstypes.tsstream.tsmod.tsTest coverage
99 tests passing across 6 suites:
all(),race(), combined nestingresource,ensure, resource+spawneach()recording, replay, resume, loop bodySummary by CodeRabbit
New Features
Tests
Documentation
Chores