Skip to content

feat(hooks): implement useStreamingJsonHook with per-key state tracking (TAM-153) #452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions react-sdk/src/hooks/__tests__/use-streaming-json-hook.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { act, renderHook } from "@testing-library/react";
import { useStreamProps } from "../use-stream-props";

describe("useStreamProps", () => {
it("streams all keys then marks them complete when done", () => {
const { result } = renderHook(() => useStreamProps(["name", "email"]));

// Stream "name"
act(() => {
result.current.processToken({ key: "name", value: "John" });
});
expect(result.current.props).toEqual({ name: "John" });
expect(result.current.meta.name.state).toBe("streaming");
expect(result.current.meta.email.state).toBe("notStarted");

// Switch to "email" - should complete "name"
act(() => {
result.current.processToken({ key: "email", value: "john@" });
});
expect(result.current.meta.name.state).toBe("complete");
expect(result.current.meta.email.state).toBe("streaming");

// Finish stream
act(() => {
result.current.markDone();
});

expect(result.current.isStreamDone).toBe(true);
expect(result.current.meta.name.state).toBe("complete");
expect(result.current.meta.email.state).toBe("complete");

// Idempotency check - calling markDone again should be a no-op
act(() => {
result.current.markDone();
});
expect(result.current.isStreamDone).toBe(true);
});

it("marks unstreamed keys as skipped when done", () => {
const { result } = renderHook(() =>
useStreamProps(["firstName", "lastName", "age"]),
);

act(() => {
result.current.processToken({ key: "firstName", value: "Ada" });
});

act(() => {
result.current.markDone();
});

expect(result.current.meta.firstName.state).toBe("complete");
expect(result.current.meta.lastName.state).toBe("skipped");
expect(result.current.meta.age.state).toBe("skipped");
});

it("keeps key in streaming state until done is received", () => {
const { result } = renderHook(() => useStreamProps(["summary"]));

act(() => {
result.current.processToken({ key: "summary", value: "Hello" });
});

expect(result.current.isStreamDone).toBe(false);
expect(result.current.meta.summary.state).toBe("streaming");
});

it("ignores unknown keys by default", () => {
const { result } = renderHook(() => useStreamProps(["foo"]));

act(() => {
result.current.processToken({ key: "bar", value: "baz" });
});

expect(result.current.props).toEqual({});
expect(result.current.meta.foo.state).toBe("notStarted");
});

it("throws on unknown keys when configured", () => {
const { result } = renderHook(() =>
useStreamProps(["foo"], { onUnknownKey: "throw" }),
);

expect(() =>
act(() => {
result.current.processToken({ key: "bar", value: "baz" });
}),
).toThrow();
});
});
203 changes: 203 additions & 0 deletions react-sdk/src/hooks/use-stream-props.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import { useCallback, useMemo, useRef, useState } from "react";

/**
* Lifecycle state for a streamed prop key.
*/
export type KeyState = "notStarted" | "streaming" | "complete" | "skipped";

/**
* Metadata tracked for each expected key.
*/
export interface KeyMeta {
state: KeyState;
streamStartedAt?: number;
streamCompletedAt?: number;
}

/**
* A single token coming from the LLM stream.
*/
export interface StreamToken {
key: string;
value: string;
}

/**
* Behaviour when a token arrives for an unexpected key.
*
* - "ignore" (default): drop the token silently.
* - "throw": throw an Error.
*/
export type UnknownKeyMode = "ignore" | "throw";

export interface UseStreamPropsOptions {
onUnknownKey?: UnknownKeyMode;
}

export interface StreamPropsResult {
/**
* Incrementally-built props object.
*/
props: Record<string, unknown>;
/**
* Per-key streaming metadata.
*/
meta: Record<string, KeyMeta>;
/**
* True once `markDone` has been called.
*/
isStreamDone: boolean;
/**
* Feed a new token into the hook.
*/
processToken: (token: StreamToken) => void;
/**
* Mark the stream as finished (equivalent to receiving the `done` signal).
*/
markDone: () => void;
}

/**
* React hook that incrementally builds a props object from a token stream
* while tracking per-key streaming state (see Linear issue TAM-153).
*/
export function useStreamProps(
expectedKeys: string[],
options: UseStreamPropsOptions = {},
): StreamPropsResult {
const { onUnknownKey = "ignore" } = options;
const expectedKeySet = useMemo(() => new Set(expectedKeys), [expectedKeys]);

// --------------------------------------------------------------------------- //
// State
// --------------------------------------------------------------------------- //
const [props, setProps] = useState<Record<string, unknown>>({});
const [meta, setMeta] = useState<Record<string, KeyMeta>>(() => {
const initial: Record<string, KeyMeta> = {};
expectedKeys.forEach((key) => {
initial[key] = { state: "notStarted" };
});
return initial;
});
const [isStreamDone, setIsStreamDone] = useState(false);

/**
* Guard to ensure `markDone` / `processToken` become no-ops once completed.
*/
const isStreamDoneRef = useRef(false);

/**
* The key currently receiving tokens (if any).
*/
const activeKeyRef = useRef<string | null>(null);

// --------------------------------------------------------------------------- //
// Token processing
// --------------------------------------------------------------------------- //
const processToken = useCallback(
({ key, value }: StreamToken) => {
if (isStreamDoneRef.current) {
// Stream already finished - ignore further tokens.
return;
}

if (!expectedKeySet.has(key)) {
if (onUnknownKey === "throw") {
throw new Error(
`Received token for unexpected key "${key}" (expected one of: ${[
...expectedKeySet,
].join(", ")})`,
);
}
// Unknown key in "ignore" mode - drop silently.
return;
}

const now = Date.now();

setMeta((prev) => {
const next: Record<string, KeyMeta> = { ...prev };

// First token for this key – mark start.
if (next[key].state === "notStarted") {
next[key] = {
state: "streaming",
streamStartedAt: now,
};
}

// If we switched keys, complete the previously active one.
const prevActive = activeKeyRef.current;
if (prevActive && prevActive !== key) {
const prevEntry = next[prevActive];
if (prevEntry.state === "streaming") {
next[prevActive] = {
...prevEntry,
state: "complete",
streamCompletedAt: now,
};
}
}

activeKeyRef.current = key;
return next;
});

// Append token value.
setProps((prev) => ({
...prev,
[key]: (prev[key] ?? "") + value,
}));
},
[expectedKeySet, onUnknownKey],
);

// --------------------------------------------------------------------------- //
// Done handler
// --------------------------------------------------------------------------- //
const markDone = useCallback(() => {
if (isStreamDoneRef.current) {
// Idempotent - ignore if already marked done.
return;
}
isStreamDoneRef.current = true;

const now = Date.now();

setMeta((prev) => {
const next: Record<string, KeyMeta> = {};
for (const key of Object.keys(prev)) {
const entry = prev[key];
if (entry.state === "streaming") {
next[key] = {
...entry,
state: "complete",
streamCompletedAt: now,
};
} else if (entry.state === "notStarted") {
next[key] = {
...entry,
state: "skipped",
};
} else {
next[key] = entry;
}
}
return next;
});

setIsStreamDone(true);
activeKeyRef.current = null;
}, []);

// --------------------------------------------------------------------------- //
// Result
// --------------------------------------------------------------------------- //
return {
props,
meta,
isStreamDone,
processToken,
markDone,
};
}