Skip to content

✨ Support initial value in createSubject()#167

Merged
cowboyd merged 1 commit intomainfrom
subject-initial-value
Feb 20, 2026
Merged

✨ Support initial value in createSubject()#167
cowboyd merged 1 commit intomainfrom
subject-initial-value

Conversation

@cowboyd
Copy link
Member

@cowboyd cowboyd commented Feb 19, 2026

Motivation

createSubject() has no way to seed an initial value, so the first subscriber must wait for an upstream value before it can proceed.

Approach

Accept an optional initial argument in createSubject().

Summary by CodeRabbit

  • New Features

    • Subjects can be created with an optional initial value that is delivered to the first subscriber before upstream values.
  • Tests

    • Added tests validating initial-value delivery and correct value ordering for early and late subscribers.
  • Chores

    • Package version bumped to 0.8.0.

@coderabbitai
Copy link

coderabbitai bot commented Feb 19, 2026

📝 Walkthrough

Walkthrough

Adds an optional initial?: T parameter to createSubject to allow subjects to start with a preset value; updates subject behavior to deliver that initial value to early subscribers and adds two tests validating early and late subscriber behaviors. Package version bumped.

Changes

Cohort / File(s) Summary
Core Implementation
stream-helpers/subject.ts
Signature changed to createSubject<T>(initial?: T). When initial is provided, current is initialized to { done: false, value: initial }; tracking subscription path updated so first and subsequent next() calls return the expected values.
Test Coverage
stream-helpers/subject.test.ts
Adds two tests: one verifies a first subscriber sees the subject's initial value (42) before upstream emissions and then receives subsequent upstream values; another verifies a late subscriber receives the latest upstream value and that a later subscriber observes the subsequent value.
Package Manifest
stream-helpers/package.json
Bumped package version from 0.7.3 to 0.8.0.

Sequence Diagram(s)

sequenceDiagram
    actor SubscriberA as Subscriber A
    actor SubscriberB as Subscriber B
    participant Subject as Subject (createSubject(initial?))
    participant Upstream as Upstream Stream

    rect rgba(0,128,0,0.5)
    Note over Subject,SubscriberA: Initialization (initial provided)
    Subject->>SubscriberA: emit initial value (e.g., 42)
    end

    Note over Upstream,Subject: Upstream emits values
    Upstream->>Subject: next(value1)
    Subject->>SubscriberA: forward value1
    Upstream->>Subject: next(value2)
    Subject->>SubscriberA: forward value2

    SubscriberB->>Subject: subscribe (late)
    Subject->>SubscriberB: emit current latest value (value2)
    Upstream->>Subject: next(value3)
    Subject->>SubscriberA: forward value3
    Subject->>SubscriberB: forward value3
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding support for an initial value in createSubject().
Description check ✅ Passed The description follows the template with clear Motivation and Approach sections explaining the feature and implementation.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Policy Compliance ✅ Passed The pull request fully complies with all mandatory policies including Package.json Metadata, version bump, implementation pattern, and test requirements.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch subject-initial-value

Comment @coderabbitai help to get the list of available commands and usage tips.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Feb 19, 2026

Open in StackBlitz

npm i https://pkg.pr.new/thefrontside/effectionx/@effectionx/stream-helpers@167

commit: 2e9e67f

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
stream-helpers/subject.ts (1)

38-62: ⚠️ Potential issue | 🟠 Major

Initial-value path leaves current stale after the first read.

When initial is provided, the iterator switches directly to upstream after the first next(), so current is never updated with upstream values. Late subscribers will keep receiving the initial value even after upstream emits.

🔧 Proposed fix
-      let iterator: Subscription<T, TClose> = current
-        ? {
-            *next() {
-              iterator = upstream;
-              return current!;
-            },
-          }
-        : {
-            *next() {
-              current = yield* upstream.next();
-              return current;
-            },
-          };
+      let iterator: Subscription<T, TClose> = current
+        ? {
+            *next() {
+              const first = current!;
+              iterator = {
+                *next() {
+                  current = yield* upstream.next();
+                  return current;
+                },
+              };
+              return first;
+            },
+          }
+        : {
+            *next() {
+              current = yield* upstream.next();
+              return current;
+            },
+          };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@stream-helpers/subject.ts` around lines 38 - 62, The initial-value branch in
createSubject leaves `current` unchanged after the first read so late
subscribers keep the initial value; update the initial branch in the returned
Subscription (the branch that currently sets iterator = upstream) so that after
returning the initial `current` it immediately consumes one value from
`upstream` and stores that result into `current` (or replaces `iterator` with a
Subscription whose next() updates `current` by yielding from `upstream.next()`),
ensuring `current`, `upstream`, and `iterator` are kept in sync for subsequent
subscribers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@stream-helpers/subject.test.ts`:
- Around line 90-99: Add a new test that verifies a late subscriber receives the
most recent upstream value (not the original initial value) by using
createSubject with initial 42, subscribing subscriber1, sending an upstream
value (e.g. 1) through upstream, then subscribing subscriber2 and asserting
subscriber1 yields 42 then 1 and subscriber2 yields 1; use the same helpers
referenced in the diff (createSubject, upstream/downstream pattern, next,
subject, subscriber1, subscriber2) to locate where to add the test and follow
existing test style.

In `@stream-helpers/subject.ts`:
- Around line 38-40: The change adds an optional parameter initial to
createSubject<T>(initial?: T) which is a backward-compatible new feature, so
update the package version with a minor bump (increment the minor version in the
package manifest/release metadata) per .policies/version-bump.md; ensure the
changelog/release notes mention the new optional initial parameter on
createSubject and run whatever release tooling (version tag, CI release step)
your repo uses to publish the minor version.

---

Outside diff comments:
In `@stream-helpers/subject.ts`:
- Around line 38-62: The initial-value branch in createSubject leaves `current`
unchanged after the first read so late subscribers keep the initial value;
update the initial branch in the returned Subscription (the branch that
currently sets iterator = upstream) so that after returning the initial
`current` it immediately consumes one value from `upstream` and stores that
result into `current` (or replaces `iterator` with a Subscription whose next()
updates `current` by yielding from `upstream.next()`), ensuring `current`,
`upstream`, and `iterator` are kept in sync for subsequent subscribers.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
stream-helpers/subject.ts (1)

50-62: ⚠️ Potential issue | 🔴 Critical

Bug: Late subscribers receive stale initial value instead of latest upstream value.

After the first subscriber consumes the initial value and switches iterator = upstream (line 53), subsequent next() calls go directly to upstream.next() which does not update current. Meanwhile, the non-initial path (lines 58-61) always updates current. When a late subscriber joins, it sees the stale current (initial value) instead of the latest upstream value.

This matches the pipeline failure: Expected: 1, Received: 42.

🐛 Proposed fix: Ensure both paths update `current` on upstream reads
       let iterator: Subscription<T, TClose> = current
         ? {
             *next() {
-              iterator = upstream;
+              iterator = {
+                *next() {
+                  current = yield* upstream.next();
+                  return current;
+                },
+              };
               return current!;
             },
           }
         : {
             *next() {
               current = yield* upstream.next();
               return current;
             },
           };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@stream-helpers/subject.ts` around lines 50 - 62, The branch that sets
iterator = upstream after the first subscriber consumes the initial value
returns current directly and then delegates to upstream without updating
current, causing late subscribers to see the stale initial value; modify the
iterator implementation so both branches update the shared current from upstream
reads (e.g., when delegating to upstream.next() assign the result to current
before returning) so that Subscription<T, TClose>.next() always refreshes
current from upstream regardless of which branch is taken.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@stream-helpers/subject.ts`:
- Around line 41-44: The current check in createSubject that uses `typeof
initial !== "undefined"` prevents callers from intentionally passing `undefined`
as a valid initial T; update createSubject to detect whether an argument was
provided (e.g., use overloads and inspect arguments.length or accept a rest args
tuple) and set `current` to { done: false, value: initial } only when an
argument was actually passed, leaving `current` undefined when no argument was
supplied; locate the createSubject function and the `current` variable to
implement the overloads/rest-args or sentinel-based approach so explicit
undefined values are preserved.

---

Outside diff comments:
In `@stream-helpers/subject.ts`:
- Around line 50-62: The branch that sets iterator = upstream after the first
subscriber consumes the initial value returns current directly and then
delegates to upstream without updating current, causing late subscribers to see
the stale initial value; modify the iterator implementation so both branches
update the shared current from upstream reads (e.g., when delegating to
upstream.next() assign the result to current before returning) so that
Subscription<T, TClose>.next() always refreshes current from upstream regardless
of which branch is taken.

---

Duplicate comments:
In `@stream-helpers/subject.ts`:
- Around line 38-44: The new optional initial parameter added to
createSubject<T>(initial?: T) is a backward-compatible feature and requires a
minor version bump; update the version field in stream-helpers/package.json to
the next minor (e.g., x.y.z -> x.(y+1).0) and commit that change so the package
version reflects the addition of the initial parameter to createSubject.

@cowboyd cowboyd force-pushed the subject-initial-value branch from 1ed8b57 to bf86cfb Compare February 20, 2026 01:02
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
stream-helpers/subject.ts (1)

50-62: ⚠️ Potential issue | 🔴 Critical

Bug: current is never updated after initial value is consumed, breaking late subscriber semantics.

When the initial value branch is used (lines 51-56), it returns current! and switches to upstream, but current is never updated with subsequent upstream values. This causes the pipeline failure:

  1. createSubject(42)current = { done: false, value: 42 }
  2. subscriber1's first next() returns 42, switches iterator to upstream
  3. upstream.send(1)current is not updated
  4. subscriber2 subscribes, sees truthy current (still 42), returns stale value

The late subscriber should receive the latest upstream value (1), not the stale initial (42).

🐛 Proposed fix: update `current` when consuming from upstream
       let iterator: Subscription<T, TClose> = current
         ? {
             *next() {
               iterator = upstream;
               return current!;
             },
           }
         : {
             *next() {
               current = yield* upstream.next();
               return current;
             },
           };

       return {
-        next: () => iterator.next(),
+        *next() {
+          let result = yield* iterator.next();
+          current = result;
+          return result;
+        },
       };

This ensures current is always updated with the latest value, regardless of which branch the iterator started with.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@stream-helpers/subject.ts` around lines 50 - 62, The iterator implementation
for the initial-value branch leaves `current` stale after switching to
`upstream`; in the branch where iterator is set to return the existing
`current!` (the first object created in createSubject), update `current` by
delegating to `upstream.next()` once you switch the iterator to `upstream` so
future subscribers see the latest value. Concretely, inside the initial-value
branch's generator (the one that currently does `iterator = upstream; return
current!;`), after assigning `iterator = upstream` call into `upstream.next()`
and assign its result to `current` (same pattern used in the other branch that
does `current = yield* upstream.next()`), so `current` is always refreshed with
upstream emissions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@stream-helpers/subject.ts`:
- Around line 50-62: The iterator implementation for the initial-value branch
leaves `current` stale after switching to `upstream`; in the branch where
iterator is set to return the existing `current!` (the first object created in
createSubject), update `current` by delegating to `upstream.next()` once you
switch the iterator to `upstream` so future subscribers see the latest value.
Concretely, inside the initial-value branch's generator (the one that currently
does `iterator = upstream; return current!;`), after assigning `iterator =
upstream` call into `upstream.next()` and assign its result to `current` (same
pattern used in the other branch that does `current = yield* upstream.next()`),
so `current` is always refreshed with upstream emissions.

---

Duplicate comments:
In `@stream-helpers/subject.ts`:
- Around line 41-44: The check `typeof initial !== "undefined"` on the local
`initial` value prevents callers from intentionally passing `undefined`; update
the `subject` API to use overloads that distinguish "no initial provided" vs
"initial provided (possibly undefined)" and replace the runtime check with an
arguments-length check (e.g., use `arguments.length > 0`) when creating
`current` so explicit `undefined` is preserved; ensure the overload signatures
for `subject` reflect the presence/absence of the initial value and keep the
`current` initialization logic using the `initial` identifier and
`IteratorResult<T>` as before.

@cowboyd cowboyd force-pushed the subject-initial-value branch from bf86cfb to 2e9e67f Compare February 20, 2026 01:07
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@stream-helpers/subject.ts`:
- Around line 38-44: The current createSubject initialization treats an explicit
undefined initial as "not provided" because it checks typeof initial !==
"undefined"; change the presence check to detect whether the caller actually
passed the parameter (e.g., use arguments.length > 0 or a sentinel) so that
passing undefined intentionally seeds current with { done: false, value:
undefined }; update the initialization logic around the current variable in
createSubject to use that presence check (keep the rest of the function
unchanged).

@cowboyd cowboyd merged commit eedaf26 into main Feb 20, 2026
10 of 11 checks passed
@cowboyd cowboyd deleted the subject-initial-value branch February 20, 2026 15:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants