Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/funny-houses-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@langchain/langgraph": patch
---

Add `stream.encoding` option to emit LangGraph API events as Server-Sent Events. This allows for sending events through the wire by piping the stream to a `Response` object.
28 changes: 14 additions & 14 deletions examples/stream-transport-vite/src/server.mts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { BaseMessage } from "@langchain/core/messages";
import { StateGraph, MessagesZodMeta, START } from "@langchain/langgraph";
import { toLangGraphEventStreamResponse } from "@langchain/langgraph/ui";
import { registry } from "@langchain/langgraph/zod";
import { ChatOpenAI } from "@langchain/openai";
import { z } from "zod/v4";
Expand All @@ -10,11 +9,11 @@ import { Hono } from "hono";

const llm = new ChatOpenAI({ model: "gpt-4o-mini" });

const graph = new StateGraph(
z.object({
messages: z.custom<BaseMessage[]>().register(registry, MessagesZodMeta),
})
)
const schema = z.object({
messages: z.custom<BaseMessage[]>().register(registry, MessagesZodMeta),
});

const graph = new StateGraph(schema)
.addNode("agent", async ({ messages }) => ({
messages: await llm.invoke(messages),
}))
Expand All @@ -26,14 +25,15 @@ export type GraphType = typeof graph;
const app = new Hono();

app.post("/api/stream", async (c) => {
type InputType = GraphType["~InputType"];
const { input } = await c.req.json<{ input: InputType }>();

return toLangGraphEventStreamResponse({
stream: graph.streamEvents(input, {
version: "v2",
streamMode: ["values", "messages"],
}),
const { input } = z.object({ input: schema }).parse(await c.req.json());

const stream = await graph.stream(input, {
encoding: "text/event-stream",
streamMode: ["values", "messages", "updates"],
});

return new Response(stream, {
headers: { "Content-Type": "text/event-stream" },
});
});

Expand Down
11 changes: 0 additions & 11 deletions libs/langgraph/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,6 @@
},
"input": "./src/graph/zod/schema.ts"
},
"./ui": {
"import": {
"types": "./dist/ui/index.d.ts",
"default": "./dist/ui/index.js"
},
"require": {
"types": "./dist/ui/index.d.cts",
"default": "./dist/ui/index.cjs"
},
"input": "./src/ui/index.ts"
},
"./package.json": "./package.json"
},
"files": [
Expand Down
47 changes: 32 additions & 15 deletions libs/langgraph/src/pregel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import { PregelRunner } from "./runner.js";
import {
IterableReadableStreamWithAbortSignal,
IterableReadableWritableStream,
toEventStream,
} from "./stream.js";
import type {
Durability,
Expand Down Expand Up @@ -1827,11 +1828,19 @@ export class Pregel<
*/
override async stream<
TStreamMode extends StreamMode | StreamMode[] | undefined,
TSubgraphs extends boolean
TSubgraphs extends boolean,
TEncoding extends "text/event-stream" | undefined
>(
input: InputType | CommandType | null,
options?: Partial<
PregelOptions<Nodes, Channels, ContextType, TStreamMode, TSubgraphs>
PregelOptions<
Nodes,
Channels,
ContextType,
TStreamMode,
TSubgraphs,
TEncoding
>
>
): Promise<
IterableReadableStream<
Expand All @@ -1842,7 +1851,8 @@ export class Pregel<
StreamValuesType,
keyof Nodes,
NodeReturnType,
StreamCustom
StreamCustom,
TEncoding
>
>
> {
Expand All @@ -1860,18 +1870,11 @@ export class Pregel<
.signal,
};

const stream = await super.stream(input, config);
return new IterableReadableStreamWithAbortSignal(
(await super.stream(input, config)) as IterableReadableStream<
StreamOutputMap<
TStreamMode,
TSubgraphs,
StreamUpdatesType,
StreamValuesType,
keyof Nodes,
NodeReturnType,
StreamCustom
>
>,
options?.encoding === "text/event-stream"
? toEventStream(stream)
: stream,
abortController
);
}
Expand Down Expand Up @@ -1958,6 +1961,9 @@ export class Pregel<
input: PregelInputType | Command,
options?: Partial<PregelOptions<Nodes, Channels>>
): AsyncGenerator<PregelOutputType> {
// Skip LGP encoding option is `streamEvents` is used
const streamEncoding =
"version" in (options ?? {}) ? undefined : options?.encoding ?? undefined;
const streamSubgraphs = options?.subgraphs;
const inputConfig = ensureLangGraphConfig(this.config, options);
if (
Expand Down Expand Up @@ -2144,6 +2150,14 @@ export class Pregel<
}
const [namespace, mode, payload] = chunk;
if (streamMode.includes(mode)) {
if (streamEncoding === "text/event-stream") {
if (streamSubgraphs) {
yield [namespace, mode, payload];
} else {
yield [null, mode, payload];
}
continue;
}
if (streamSubgraphs && !streamModeSingle) {
yield [namespace, mode, payload];
} else if (!streamModeSingle) {
Expand Down Expand Up @@ -2177,13 +2191,16 @@ export class Pregel<
*/
override async invoke(
input: InputType | CommandType | null,
options?: Partial<PregelOptions<Nodes, Channels, ContextType>>
options?: Partial<
Omit<PregelOptions<Nodes, Channels, ContextType>, "encoding">
>
): Promise<OutputType> {
const streamMode = options?.streamMode ?? "values";
const config = {
...options,
outputKeys: options?.outputKeys ?? this.outputChannels,
streamMode,
encoding: undefined,
};
const chunks = [];
const stream = await this.stream(input, config);
Expand Down
167 changes: 166 additions & 1 deletion libs/langgraph/src/pregel/stream.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
import { IterableReadableStream } from "@langchain/core/utils/stream";
import { StreamMode } from "./types.js";
import type { RunnableConfig } from "@langchain/core/runnables";
import type { StreamMode, StreamOutputMap } from "./types.js";

// [namespace, streamMode, payload]
export type StreamChunk = [string[], StreamMode, unknown];

type StreamCheckpointsOutput<StreamValues> = StreamOutputMap<
"checkpoints",
false,
StreamValues,
unknown,
string,
unknown,
unknown,
undefined
>;

type AnyStreamOutput = StreamOutputMap<
StreamMode[],
true,
unknown,
unknown,
string,
unknown,
unknown,
undefined
>;

/**
* A wrapper around an IterableReadableStream that allows for aborting the stream when
* {@link cancel} is called.
Expand Down Expand Up @@ -125,3 +148,145 @@ export class IterableReadableWritableStream extends IterableReadableStream<Strea
this.controller.error(e);
}
}

function _stringifyAsDict(obj: unknown) {
return JSON.stringify(obj, function (key: string | number, value: unknown) {
const rawValue = this[key];
if (
rawValue != null &&
typeof rawValue === "object" &&
"toDict" in rawValue &&
typeof rawValue.toDict === "function"
) {
const { type, data } = rawValue.toDict();
return { ...data, type };
}

return value;
});
}

function _serializeError(error: unknown) {
// eslint-disable-next-line no-instanceof/no-instanceof
if (error instanceof Error) {
return { error: error.name, message: error.message };
}
return { error: "Error", message: JSON.stringify(error) };
}

function _isRunnableConfig(
config: unknown
): config is RunnableConfig & { configurable: Record<string, unknown> } {
if (typeof config !== "object" || config == null) return false;
return (
"configurable" in config &&
typeof config.configurable === "object" &&
config.configurable != null
);
}

function _extractCheckpointFromConfig(
config: RunnableConfig | null | undefined
) {
if (!_isRunnableConfig(config) || !config.configurable.thread_id) {
return null;
}

return {
thread_id: config.configurable.thread_id,
checkpoint_ns: config.configurable.checkpoint_ns || "",
checkpoint_id: config.configurable.checkpoint_id || null,
checkpoint_map: config.configurable.checkpoint_map || null,
};
}

function _serializeConfig(config: unknown) {
if (_isRunnableConfig(config)) {
const configurable = Object.fromEntries(
Object.entries(config.configurable).filter(
([key]) => !key.startsWith("__")
)
);

const newConfig = { ...config, configurable };
delete newConfig.callbacks;
return newConfig;
}

return config;
}

function _serializeCheckpoint(payload: StreamCheckpointsOutput<unknown>) {
const result: Record<string, unknown> = {
...payload,
checkpoint: _extractCheckpointFromConfig(payload.config),
parent_checkpoint: _extractCheckpointFromConfig(payload.parentConfig),

config: _serializeConfig(payload.config),
parent_config: _serializeConfig(payload.parentConfig),

tasks: payload.tasks.map((task) => {
if (_isRunnableConfig(task.state)) {
const checkpoint = _extractCheckpointFromConfig(task.state);
if (checkpoint != null) {
const cloneTask: Record<string, unknown> = { ...task, checkpoint };
delete cloneTask.state;
return cloneTask;
}
}

return task;
}),
};

delete result.parentConfig;
return result;
}

export function toEventStream(stream: AsyncGenerator) {
const encoder = new TextEncoder();
return new ReadableStream<Uint8Array>({
async start(controller) {
const enqueueChunk = (sse: {
id?: string;
event: string;
data: unknown;
}) => {
controller.enqueue(
encoder.encode(
`event: ${sse.event}\ndata: ${_stringifyAsDict(sse.data)}\n\n`
)
);
};

try {
for await (const payload of stream) {
const [ns, mode, chunk] = payload as AnyStreamOutput;

let data: unknown = chunk;
if (mode === "debug") {
const debugChunk = chunk;

if (debugChunk.type === "checkpoint") {
data = {
...debugChunk,
payload: _serializeCheckpoint(debugChunk.payload),
};
}
}

if (mode === "checkpoints") {
data = _serializeCheckpoint(chunk);
}

const event = ns?.length ? `${mode}|${ns.join("|")}` : mode;
enqueueChunk({ event, data });
}
} catch (error) {
enqueueChunk({ event: "error", data: _serializeError(error) });
}

controller.close();
},
});
}
Loading
Loading