✨ Support initial value in createSubject()#167
Conversation
📝 WalkthroughWalkthroughAdds an optional Changes
Sequence Diagram(s)sequenceDiagram
actor SubscriberA as Subscriber A
actor SubscriberB as Subscriber B
participant Subject as Subject (createSubject(initial?))
participant Upstream as Upstream Stream
rect rgba(0,128,0,0.5)
Note over Subject,SubscriberA: Initialization (initial provided)
Subject->>SubscriberA: emit initial value (e.g., 42)
end
Note over Upstream,Subject: Upstream emits values
Upstream->>Subject: next(value1)
Subject->>SubscriberA: forward value1
Upstream->>Subject: next(value2)
Subject->>SubscriberA: forward value2
SubscriberB->>Subject: subscribe (late)
Subject->>SubscriberB: emit current latest value (value2)
Upstream->>Subject: next(value3)
Subject->>SubscriberA: forward value3
Subject->>SubscriberB: forward value3
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
commit: |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
stream-helpers/subject.ts (1)
38-62:⚠️ Potential issue | 🟠 MajorInitial-value path leaves
currentstale after the first read.When
initialis provided, the iterator switches directly toupstreamafter the firstnext(), socurrentis never updated with upstream values. Late subscribers will keep receiving the initial value even after upstream emits.🔧 Proposed fix
- let iterator: Subscription<T, TClose> = current - ? { - *next() { - iterator = upstream; - return current!; - }, - } - : { - *next() { - current = yield* upstream.next(); - return current; - }, - }; + let iterator: Subscription<T, TClose> = current + ? { + *next() { + const first = current!; + iterator = { + *next() { + current = yield* upstream.next(); + return current; + }, + }; + return first; + }, + } + : { + *next() { + current = yield* upstream.next(); + return current; + }, + };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-helpers/subject.ts` around lines 38 - 62, The initial-value branch in createSubject leaves `current` unchanged after the first read so late subscribers keep the initial value; update the initial branch in the returned Subscription (the branch that currently sets iterator = upstream) so that after returning the initial `current` it immediately consumes one value from `upstream` and stores that result into `current` (or replaces `iterator` with a Subscription whose next() updates `current` by yielding from `upstream.next()`), ensuring `current`, `upstream`, and `iterator` are kept in sync for subsequent subscribers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@stream-helpers/subject.test.ts`:
- Around line 90-99: Add a new test that verifies a late subscriber receives the
most recent upstream value (not the original initial value) by using
createSubject with initial 42, subscribing subscriber1, sending an upstream
value (e.g. 1) through upstream, then subscribing subscriber2 and asserting
subscriber1 yields 42 then 1 and subscriber2 yields 1; use the same helpers
referenced in the diff (createSubject, upstream/downstream pattern, next,
subject, subscriber1, subscriber2) to locate where to add the test and follow
existing test style.
In `@stream-helpers/subject.ts`:
- Around line 38-40: The change adds an optional parameter initial to
createSubject<T>(initial?: T) which is a backward-compatible new feature, so
update the package version with a minor bump (increment the minor version in the
package manifest/release metadata) per .policies/version-bump.md; ensure the
changelog/release notes mention the new optional initial parameter on
createSubject and run whatever release tooling (version tag, CI release step)
your repo uses to publish the minor version.
---
Outside diff comments:
In `@stream-helpers/subject.ts`:
- Around line 38-62: The initial-value branch in createSubject leaves `current`
unchanged after the first read so late subscribers keep the initial value;
update the initial branch in the returned Subscription (the branch that
currently sets iterator = upstream) so that after returning the initial
`current` it immediately consumes one value from `upstream` and stores that
result into `current` (or replaces `iterator` with a Subscription whose next()
updates `current` by yielding from `upstream.next()`), ensuring `current`,
`upstream`, and `iterator` are kept in sync for subsequent subscribers.
0388c0c to
1ed8b57
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
stream-helpers/subject.ts (1)
50-62:⚠️ Potential issue | 🔴 CriticalBug: Late subscribers receive stale initial value instead of latest upstream value.
After the first subscriber consumes the initial value and switches
iterator = upstream(line 53), subsequentnext()calls go directly toupstream.next()which does not updatecurrent. Meanwhile, the non-initial path (lines 58-61) always updatescurrent. When a late subscriber joins, it sees the stalecurrent(initial value) instead of the latest upstream value.This matches the pipeline failure:
Expected: 1, Received: 42.🐛 Proposed fix: Ensure both paths update `current` on upstream reads
let iterator: Subscription<T, TClose> = current ? { *next() { - iterator = upstream; + iterator = { + *next() { + current = yield* upstream.next(); + return current; + }, + }; return current!; }, } : { *next() { current = yield* upstream.next(); return current; }, };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-helpers/subject.ts` around lines 50 - 62, The branch that sets iterator = upstream after the first subscriber consumes the initial value returns current directly and then delegates to upstream without updating current, causing late subscribers to see the stale initial value; modify the iterator implementation so both branches update the shared current from upstream reads (e.g., when delegating to upstream.next() assign the result to current before returning) so that Subscription<T, TClose>.next() always refreshes current from upstream regardless of which branch is taken.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@stream-helpers/subject.ts`:
- Around line 41-44: The current check in createSubject that uses `typeof
initial !== "undefined"` prevents callers from intentionally passing `undefined`
as a valid initial T; update createSubject to detect whether an argument was
provided (e.g., use overloads and inspect arguments.length or accept a rest args
tuple) and set `current` to { done: false, value: initial } only when an
argument was actually passed, leaving `current` undefined when no argument was
supplied; locate the createSubject function and the `current` variable to
implement the overloads/rest-args or sentinel-based approach so explicit
undefined values are preserved.
---
Outside diff comments:
In `@stream-helpers/subject.ts`:
- Around line 50-62: The branch that sets iterator = upstream after the first
subscriber consumes the initial value returns current directly and then
delegates to upstream without updating current, causing late subscribers to see
the stale initial value; modify the iterator implementation so both branches
update the shared current from upstream reads (e.g., when delegating to
upstream.next() assign the result to current before returning) so that
Subscription<T, TClose>.next() always refreshes current from upstream regardless
of which branch is taken.
---
Duplicate comments:
In `@stream-helpers/subject.ts`:
- Around line 38-44: The new optional initial parameter added to
createSubject<T>(initial?: T) is a backward-compatible feature and requires a
minor version bump; update the version field in stream-helpers/package.json to
the next minor (e.g., x.y.z -> x.(y+1).0) and commit that change so the package
version reflects the addition of the initial parameter to createSubject.
1ed8b57 to
bf86cfb
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
stream-helpers/subject.ts (1)
50-62:⚠️ Potential issue | 🔴 CriticalBug:
currentis never updated after initial value is consumed, breaking late subscriber semantics.When the initial value branch is used (lines 51-56), it returns
current!and switches toupstream, butcurrentis never updated with subsequent upstream values. This causes the pipeline failure:
createSubject(42)→current = { done: false, value: 42 }- subscriber1's first
next()returns 42, switches iterator to upstreamupstream.send(1)—currentis not updated- subscriber2 subscribes, sees truthy
current(still 42), returns stale valueThe late subscriber should receive the latest upstream value (1), not the stale initial (42).
🐛 Proposed fix: update `current` when consuming from upstream
let iterator: Subscription<T, TClose> = current ? { *next() { iterator = upstream; return current!; }, } : { *next() { current = yield* upstream.next(); return current; }, }; return { - next: () => iterator.next(), + *next() { + let result = yield* iterator.next(); + current = result; + return result; + }, };This ensures
currentis always updated with the latest value, regardless of which branch the iterator started with.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-helpers/subject.ts` around lines 50 - 62, The iterator implementation for the initial-value branch leaves `current` stale after switching to `upstream`; in the branch where iterator is set to return the existing `current!` (the first object created in createSubject), update `current` by delegating to `upstream.next()` once you switch the iterator to `upstream` so future subscribers see the latest value. Concretely, inside the initial-value branch's generator (the one that currently does `iterator = upstream; return current!;`), after assigning `iterator = upstream` call into `upstream.next()` and assign its result to `current` (same pattern used in the other branch that does `current = yield* upstream.next()`), so `current` is always refreshed with upstream emissions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@stream-helpers/subject.ts`:
- Around line 50-62: The iterator implementation for the initial-value branch
leaves `current` stale after switching to `upstream`; in the branch where
iterator is set to return the existing `current!` (the first object created in
createSubject), update `current` by delegating to `upstream.next()` once you
switch the iterator to `upstream` so future subscribers see the latest value.
Concretely, inside the initial-value branch's generator (the one that currently
does `iterator = upstream; return current!;`), after assigning `iterator =
upstream` call into `upstream.next()` and assign its result to `current` (same
pattern used in the other branch that does `current = yield* upstream.next()`),
so `current` is always refreshed with upstream emissions.
---
Duplicate comments:
In `@stream-helpers/subject.ts`:
- Around line 41-44: The check `typeof initial !== "undefined"` on the local
`initial` value prevents callers from intentionally passing `undefined`; update
the `subject` API to use overloads that distinguish "no initial provided" vs
"initial provided (possibly undefined)" and replace the runtime check with an
arguments-length check (e.g., use `arguments.length > 0`) when creating
`current` so explicit `undefined` is preserved; ensure the overload signatures
for `subject` reflect the presence/absence of the initial value and keep the
`current` initialization logic using the `initial` identifier and
`IteratorResult<T>` as before.
bf86cfb to
2e9e67f
Compare
There was a problem hiding this comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@stream-helpers/subject.ts`:
- Around line 38-44: The current createSubject initialization treats an explicit
undefined initial as "not provided" because it checks typeof initial !==
"undefined"; change the presence check to detect whether the caller actually
passed the parameter (e.g., use arguments.length > 0 or a sentinel) so that
passing undefined intentionally seeds current with { done: false, value:
undefined }; update the initialization logic around the current variable in
createSubject to use that presence check (keep the rest of the function
unchanged).
Motivation
createSubject()has no way to seed an initial value, so the first subscriber must wait for an upstream value before it can proceed.Approach
Accept an optional
initialargument increateSubject().Summary by CodeRabbit
New Features
Tests
Chores