diff --git a/docs/core_docs/docs/versions/v0_2.mdx b/docs/core_docs/docs/versions/v0_2/index.mdx similarity index 97% rename from docs/core_docs/docs/versions/v0_2.mdx rename to docs/core_docs/docs/versions/v0_2/index.mdx index a7a170e1ae97..141252301449 100644 --- a/docs/core_docs/docs/versions/v0_2.mdx +++ b/docs/core_docs/docs/versions/v0_2/index.mdx @@ -7,6 +7,12 @@ sidebar_label: v0.2 LangChain v0.2 was released in May 2024. This release includes a number of breaking changes and deprecations. This document contains a guide on upgrading to 0.2.x, as well as a list of deprecations and breaking changes. +:::note Reference + +- [Migrating to Astream Events v2](/docs/versions/v0_2/migrating_astream_events) + +::: + ## Migration This documentation will help you upgrade your code to LangChain `0.2.x.`. To prepare for migration, we first recommend you take the following steps: diff --git a/docs/core_docs/docs/versions/v0_2/migrating_astream_events.mdx b/docs/core_docs/docs/versions/v0_2/migrating_astream_events.mdx new file mode 100644 index 000000000000..023c6ad234bd --- /dev/null +++ b/docs/core_docs/docs/versions/v0_2/migrating_astream_events.mdx @@ -0,0 +1,125 @@ +--- +sidebar_position: 2 +sidebar_label: streamEvents v2 +--- + +# Migrating to streamEvents v2 + +:::danger + +This migration guide is a work in progress and is not complete. + +::: + +We've added a `v2` of the [`streamEvents`](/docs/how_to/streaming#using-stream-events) API with the release of `0.2.0`. You can see this [PR](https://github.com/langchain-ai/langchainjs/pull/5539/) for more details. + +The `v2` version is a re-write of the `v1` version, and should be more efficient, with more consistent output for the events. The `v1` version of the API will be deprecated in favor of the `v2` version and will be removed in `0.4.0`. + +Below is a list of changes between the `v1` and `v2` versions of the API. + +### output for `on_chat_model_end` + +In `v1`, the outputs associated with `on_chat_model_end` changed depending on whether the +chat model was run as a root level runnable or as part of a chain. + +As a root level runnable the output was: + +```ts +{ + data: { + output: AIMessageChunk((content = "hello world!"), (id = "some id")); + } +} +``` + +As part of a chain the output was: + +``` +{ + data: { + output: { + generations: [ + [ + { + generation_info: None, + message: AIMessageChunk( + content="hello world!", id="some id" + ), + text: "hello world!", + } + ] + ], + } + }, +} +``` + +As of `v2`, the output will always be the simpler representation: + +```ts +{ + data: { + output: AIMessageChunk((content = "hello world!"), (id = "some id")); + } +} +``` + +:::note +Non chat models (i.e., regular LLMs) will be consistently associated with the more verbose format for now. +::: + +### output for `on_retriever_end` + +`on_retriever_end` output will always return a list of `Documents`. + +This was the output in `v1`: + +```ts +{ + data: { + output: { + documents: [ + Document(...), + Document(...), + ... + ] + } + } +} +``` + +And here is the new output for `v2`: + +```ts +{ + data: { + output: [ + Document(...), + Document(...), + ... + ] + } +} +``` + +### Removed `on_retriever_stream` + +The `on_retriever_stream` event was an artifact of the implementation and has been removed. + +Full information associated with the event is already available in the `on_retriever_end` event. + +Please use `on_retriever_end` instead. + +### Removed `on_tool_stream` + +The `on_tool_stream` event was an artifact of the implementation and has been removed. + +Full information associated with the event is already available in the `on_tool_end` event. + +Please use `on_tool_end` instead. + +### Propagating Names + +Names of runnables have been updated to be more consistent. + +If you're filtering by event names, check if you need to update your filters. diff --git a/langchain-core/.eslintrc.cjs b/langchain-core/.eslintrc.cjs index 29b23f23cd0a..39b49268e8ee 100644 --- a/langchain-core/.eslintrc.cjs +++ b/langchain-core/.eslintrc.cjs @@ -37,6 +37,7 @@ module.exports = { "@typescript-eslint/no-unused-vars": ["warn", { args: "none" }], "@typescript-eslint/no-floating-promises": "error", "@typescript-eslint/no-misused-promises": "error", + "@typescript-eslint/no-this-alias": 0, camelcase: 0, "class-methods-use-this": 0, "import/extensions": [2, "ignorePackages"], diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index acea927f1053..d9501e5ccc80 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -1,5 +1,6 @@ import { z } from "zod"; import pRetry from "p-retry"; +import { v4 as uuidv4 } from "uuid"; import type { RunnableInterface, RunnableBatchOptions } from "./types.js"; import { @@ -11,9 +12,13 @@ import { LogStreamCallbackHandlerInput, RunLog, RunLogPatch, +} from "../tracers/log_stream.js"; +import { + EventStreamCallbackHandler, + EventStreamCallbackHandlerInput, StreamEvent, StreamEventData, -} from "../tracers/log_stream.js"; +} from "../tracers/event_stream.js"; import { Serializable } from "../load/serializable.js"; import { IterableReadableStream, @@ -497,18 +502,34 @@ export abstract class Runnable< ); delete config.runId; runManager = pipe.setup; + + const isStreamEventsHandler = ( + handler: BaseCallbackHandler + ): handler is EventStreamCallbackHandler => + handler.name === "event_stream_tracer"; + const streamEventsHandler = runManager?.handlers.find( + isStreamEventsHandler + ); + let iterator = pipe.output; + if (streamEventsHandler !== undefined && runManager !== undefined) { + iterator = streamEventsHandler.tapOutputIterable( + runManager.runId, + iterator + ); + } + const isLogStreamHandler = ( handler: BaseCallbackHandler ): handler is LogStreamCallbackHandler => handler.name === "log_stream_tracer"; const streamLogHandler = runManager?.handlers.find(isLogStreamHandler); - let iterator = pipe.output; if (streamLogHandler !== undefined && runManager !== undefined) { - iterator = await streamLogHandler.tapOutputIterable( + iterator = streamLogHandler.tapOutputIterable( runManager.runId, - pipe.output + iterator ); } + for await (const chunk of iterator) { yield chunk; if (finalOutputSupported) { @@ -721,47 +742,75 @@ export abstract class Runnable< * chains. Metadata fields have been omitted from the table for brevity. * Chain definitions have been included after the table. * - * | event | name | chunk | input | output | - * |----------------------|------------------|------------------------------------|-----------------------------------------------|-------------------------------------------------| - * | on_llm_start | [model name] | | {'input': 'hello'} | | - * | on_llm_stream | [model name] | 'Hello' OR AIMessageChunk("hello") | | | - * | on_llm_end | [model name] | | 'Hello human!' | - * | on_chain_start | format_docs | | | | - * | on_chain_stream | format_docs | "hello world!, goodbye world!" | | | - * | on_chain_end | format_docs | | [Document(...)] | "hello world!, goodbye world!" | - * | on_tool_start | some_tool | | {"x": 1, "y": "2"} | | - * | on_tool_stream | some_tool | {"x": 1, "y": "2"} | | | - * | on_tool_end | some_tool | | | {"x": 1, "y": "2"} | - * | on_retriever_start | [retriever name] | | {"query": "hello"} | | - * | on_retriever_chunk | [retriever name] | {documents: [...]} | | | - * | on_retriever_end | [retriever name] | | {"query": "hello"} | {documents: [...]} | - * | on_prompt_start | [template_name] | | {"question": "hello"} | | - * | on_prompt_end | [template_name] | | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | + * **ATTENTION** This reference table is for the V2 version of the schema. + * + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | event | name | chunk | input | output | + * +======================+==================+=================================+===============================================+=================================================+ + * | on_chat_model_start | [model name] | | {"messages": [[SystemMessage, HumanMessage]]} | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_chat_model_stream | [model name] | AIMessageChunk(content="hello") | | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_chat_model_end | [model name] | | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_llm_start | [model name] | | {'input': 'hello'} | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_llm_stream | [model name] | 'Hello' | | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_llm_end | [model name] | | 'Hello human!' | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_chain_start | format_docs | | | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_chain_stream | format_docs | "hello world!, goodbye world!" | | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_chain_end | format_docs | | [Document(...)] | "hello world!, goodbye world!" | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_tool_start | some_tool | | {"x": 1, "y": "2"} | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_tool_end | some_tool | | | {"x": 1, "y": "2"} | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_retriever_start | [retriever name] | | {"query": "hello"} | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_retriever_end | [retriever name] | | {"query": "hello"} | [Document(...), ..] | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_prompt_start | [template_name] | | {"question": "hello"} | | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ + * | on_prompt_end | [template_name] | | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage, ...]) | + * +----------------------+------------------+---------------------------------+-----------------------------------------------+-------------------------------------------------+ */ streamEvents( input: RunInput, - options: Partial & { version: "v1" }, - streamOptions?: Omit + options: Partial & { version: "v1" | "v2" }, + streamOptions?: Omit ): IterableReadableStream; streamEvents( input: RunInput, options: Partial & { - version: "v1"; + version: "v1" | "v2"; encoding: "text/event-stream"; }, - streamOptions?: Omit + streamOptions?: Omit ): IterableReadableStream; streamEvents( input: RunInput, options: Partial & { - version: "v1"; + version: "v1" | "v2"; encoding?: "text/event-stream" | undefined; }, - streamOptions?: Omit + streamOptions?: Omit ): IterableReadableStream { - const stream = this._streamEvents(input, options, streamOptions); + let stream; + if (options.version === "v1") { + stream = this._streamEventsV1(input, options, streamOptions); + } else if (options.version === "v2") { + stream = this._streamEventsV2(input, options, streamOptions); + } else { + throw new Error( + `Only versions "v1" and "v2" of the schema are currently supported.` + ); + } if (options.encoding === "text/event-stream") { return convertToHttpEventStream(stream); } else { @@ -769,16 +818,83 @@ export abstract class Runnable< } } - async *_streamEvents( + private async *_streamEventsV2( input: RunInput, - options: Partial & { version: "v1" }, - streamOptions?: Omit + options: Partial & { version: "v1" | "v2" }, + streamOptions?: Omit ): AsyncGenerator { - if (options.version !== "v1") { - throw new Error( - `Only version "v1" of the events schema is currently supported.` - ); + const eventStreamer = new EventStreamCallbackHandler({ + ...streamOptions, + autoClose: false, + }); + const config = ensureConfig(options); + const runId = config.runId ?? uuidv4(); + config.runId = runId; + const callbacks = config.callbacks; + if (callbacks === undefined) { + config.callbacks = [eventStreamer]; + } else if (Array.isArray(callbacks)) { + config.callbacks = callbacks.concat(eventStreamer); + } else { + const copiedCallbacks = callbacks.copy(); + copiedCallbacks.inheritableHandlers.push(eventStreamer); + // eslint-disable-next-line no-param-reassign + config.callbacks = copiedCallbacks; + } + // Call the runnable in streaming mode, + // add each chunk to the output stream + const outerThis = this; + async function consumeRunnableStream() { + try { + const runnableStream = await outerThis.stream(input, config); + const tappedStream = eventStreamer.tapOutputIterable( + runId, + runnableStream + ); + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _ of tappedStream) { + // Just iterate so that the callback handler picks up events + } + } finally { + await eventStreamer.writer.close(); + } + } + const runnableStreamConsumePromise = consumeRunnableStream(); + let firstEventSent = false; + let firstEventRunId; + try { + for await (const event of eventStreamer) { + // This is a work-around an issue where the inputs into the + // chain are not available until the entire input is consumed. + // As a temporary solution, we'll modify the input to be the input + // that was passed into the chain. + if (!firstEventSent) { + event.data.input = input; + firstEventSent = true; + firstEventRunId = event.run_id; + yield event; + continue; + } + if (event.run_id === firstEventRunId && event.event.endsWith("_end")) { + // If it's the end event corresponding to the root runnable + // we dont include the input in the event since it's guaranteed + // to be included in the first event. + if (event.data?.input) { + delete event.data.input; + } + } + yield event; + } + } finally { + await runnableStreamConsumePromise; } + } + + private async *_streamEventsV1( + input: RunInput, + options: Partial & { version: "v1" | "v2" }, + streamOptions?: Omit + ): AsyncGenerator { let runLog; let hasEncounteredStartEvent = false; const config = ensureConfig(options); @@ -1138,14 +1254,14 @@ export class RunnableBinding< streamEvents( input: RunInput, - options: Partial & { version: "v1" }, + options: Partial & { version: "v1" | "v2" }, streamOptions?: Omit ): IterableReadableStream; streamEvents( input: RunInput, options: Partial & { - version: "v1"; + version: "v1" | "v2"; encoding: "text/event-stream"; }, streamOptions?: Omit @@ -1154,7 +1270,7 @@ export class RunnableBinding< streamEvents( input: RunInput, options: Partial & { - version: "v1"; + version: "v1" | "v2"; encoding?: "text/event-stream" | undefined; }, streamOptions?: Omit diff --git a/langchain-core/src/runnables/remote.ts b/langchain-core/src/runnables/remote.ts index d5a19afdad1b..4abe506f8227 100644 --- a/langchain-core/src/runnables/remote.ts +++ b/langchain-core/src/runnables/remote.ts @@ -535,7 +535,7 @@ export class RemoteRunnable< _streamEvents( input: RunInput, - options: Partial & { version: "v1" }, + options: Partial & { version: "v1" | "v2" }, streamOptions?: Omit | undefined ): AsyncGenerator { // eslint-disable-next-line @typescript-eslint/no-this-alias @@ -611,14 +611,14 @@ export class RemoteRunnable< streamEvents( input: RunInput, - options: Partial & { version: "v1" }, + options: Partial & { version: "v1" | "v2" }, streamOptions?: Omit ): IterableReadableStream; streamEvents( input: RunInput, options: Partial & { - version: "v1"; + version: "v1" | "v2"; encoding: "text/event-stream"; }, streamOptions?: Omit @@ -627,14 +627,14 @@ export class RemoteRunnable< streamEvents( input: RunInput, options: Partial & { - version: "v1"; + version: "v1" | "v2"; encoding?: "text/event-stream" | undefined; }, streamOptions?: Omit ): IterableReadableStream { - if (options?.version !== "v1") { + if (options.version !== "v1" && options.version !== "v2") { throw new Error( - `Only version "v1" of the events schema is currently supported.` + `Only versions "v1" and "v2" of the events schema is currently supported.` ); } if (options.encoding !== undefined) { diff --git a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts new file mode 100644 index 000000000000..963bde3690c9 --- /dev/null +++ b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts @@ -0,0 +1,1030 @@ +/* eslint-disable no-promise-executor-return */ +/* eslint-disable no-process-env */ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { test } from "@jest/globals"; +import { z } from "zod"; +import { + RunnableLambda, + RunnableMap, + RunnablePassthrough, + RunnablePick, +} from "../index.js"; +import { ChatPromptTemplate } from "../../prompts/chat.js"; +import { + FakeListChatModel, + FakeRetriever, + FakeStreamingLLM, +} from "../../utils/testing/index.js"; +import { + AIMessageChunk, + HumanMessage, + SystemMessage, +} from "../../messages/index.js"; +import { DynamicStructuredTool, DynamicTool } from "../../tools.js"; +import { Document } from "../../documents/document.js"; + +function reverse(s: string) { + // Reverse a string. + return s.split("").reverse().join(""); +} + +const originalCallbackValue = process.env.LANGCHAIN_CALLBACKS_BACKGROUND; + +afterEach(() => { + process.env.LANGCHAIN_CALLBACKS_BACKGROUND = originalCallbackValue; +}); + +test("Runnable streamEvents method", async () => { + const chain = RunnableLambda.from(reverse).withConfig({ + runName: "reverse", + }); + + const events = []; + const eventStream = await chain.streamEvents("hello", { version: "v2" }); + for await (const event of eventStream) { + events.push(event); + } + expect(events).toEqual([ + { + data: { input: "hello" }, + event: "on_chain_start", + metadata: {}, + name: "reverse", + run_id: expect.any(String), + tags: [], + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "reverse", + run_id: expect.any(String), + tags: [], + }, + { + data: { output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "reverse", + run_id: expect.any(String), + tags: [], + }, + ]); +}); + +test("Runnable streamEvents method with three runnables", async () => { + const r = RunnableLambda.from(reverse); + + const chain = r + .withConfig({ runName: "1" }) + .pipe(r.withConfig({ runName: "2" })) + .pipe(r.withConfig({ runName: "3" })); + + const events = []; + const eventStream = await chain.streamEvents("hello", { version: "v2" }); + for await (const event of eventStream) { + events.push(event); + } + expect(events).toEqual([ + { + data: { input: "hello" }, + event: "on_chain_start", + metadata: {}, + name: "RunnableSequence", + run_id: expect.any(String), + tags: [], + }, + { + data: {}, + event: "on_chain_start", + metadata: {}, + name: "1", + run_id: expect.any(String), + tags: ["seq:step:1"], + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "1", + run_id: expect.any(String), + tags: ["seq:step:1"], + }, + { + data: {}, + event: "on_chain_start", + metadata: {}, + name: "2", + run_id: expect.any(String), + tags: ["seq:step:2"], + }, + { + data: { input: "hello", output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "1", + run_id: expect.any(String), + tags: ["seq:step:1"], + }, + { + data: { chunk: "hello" }, + event: "on_chain_stream", + metadata: {}, + name: "2", + run_id: expect.any(String), + tags: ["seq:step:2"], + }, + { + data: {}, + event: "on_chain_start", + metadata: {}, + name: "3", + run_id: expect.any(String), + tags: ["seq:step:3"], + }, + { + data: { input: "olleh", output: "hello" }, + event: "on_chain_end", + metadata: {}, + name: "2", + run_id: expect.any(String), + tags: ["seq:step:2"], + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "3", + run_id: expect.any(String), + tags: ["seq:step:3"], + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "RunnableSequence", + run_id: expect.any(String), + tags: [], + }, + { + data: { input: "hello", output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "3", + run_id: expect.any(String), + tags: ["seq:step:3"], + }, + { + data: { output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "RunnableSequence", + run_id: expect.any(String), + tags: [], + }, + ]); +}); + +test("Runnable streamEvents method with three runnables with backgrounded callbacks set to true", async () => { + process.env.LANGCHAIN_CALLBACKS_BACKGROUND = "true"; + const r = RunnableLambda.from(reverse); + + const chain = r + .withConfig({ runName: "1" }) + .pipe(r.withConfig({ runName: "2" })) + .pipe(r.withConfig({ runName: "3" })); + + const events = []; + const eventStream = await chain.streamEvents("hello", { version: "v2" }); + for await (const event of eventStream) { + events.push(event); + } + expect(events).toEqual([ + { + data: { input: "hello" }, + event: "on_chain_start", + metadata: {}, + name: "RunnableSequence", + run_id: expect.any(String), + tags: [], + }, + { + data: {}, + event: "on_chain_start", + metadata: {}, + name: "1", + run_id: expect.any(String), + tags: ["seq:step:1"], + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "1", + run_id: expect.any(String), + tags: ["seq:step:1"], + }, + { + data: {}, + event: "on_chain_start", + metadata: {}, + name: "2", + run_id: expect.any(String), + tags: ["seq:step:2"], + }, + { + data: { input: "hello", output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "1", + run_id: expect.any(String), + tags: ["seq:step:1"], + }, + { + data: { chunk: "hello" }, + event: "on_chain_stream", + metadata: {}, + name: "2", + run_id: expect.any(String), + tags: ["seq:step:2"], + }, + { + data: {}, + event: "on_chain_start", + metadata: {}, + name: "3", + run_id: expect.any(String), + tags: ["seq:step:3"], + }, + { + data: { input: "olleh", output: "hello" }, + event: "on_chain_end", + metadata: {}, + name: "2", + run_id: expect.any(String), + tags: ["seq:step:2"], + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "3", + run_id: expect.any(String), + tags: ["seq:step:3"], + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "RunnableSequence", + run_id: expect.any(String), + tags: [], + }, + { + data: { input: "hello", output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "3", + run_id: expect.any(String), + tags: ["seq:step:3"], + }, + { + data: { output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "RunnableSequence", + run_id: expect.any(String), + tags: [], + }, + ]); +}); + +test("Runnable streamEvents method with three runnables with filtering", async () => { + const r = RunnableLambda.from(reverse); + + const chain = r + .withConfig({ runName: "1" }) + .pipe(r.withConfig({ runName: "2", tags: ["my_tag"] })) + .pipe(r.withConfig({ runName: "3", tags: ["my_tag"] })); + + const events = []; + const eventStream = await chain.streamEvents( + "hello", + { version: "v2" }, + { + includeNames: ["1"], + } + ); + for await (const event of eventStream) { + events.push(event); + } + expect(events).toEqual([ + { + data: { input: "hello" }, + event: "on_chain_start", + metadata: {}, + name: "1", + run_id: expect.any(String), + tags: ["seq:step:1"], + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "1", + run_id: expect.any(String), + tags: ["seq:step:1"], + }, + { + data: { output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "1", + run_id: expect.any(String), + tags: ["seq:step:1"], + }, + ]); + const events2 = []; + const eventStream2 = await chain.streamEvents( + "hello", + { version: "v2" }, + { + excludeNames: ["2"], + includeTags: ["my_tag"], + } + ); + for await (const event of eventStream2) { + events2.push(event); + } + expect(events2).toEqual([ + { + data: { + input: "hello", + }, + event: "on_chain_start", + metadata: {}, + name: "3", + run_id: expect.any(String), + tags: expect.arrayContaining(["seq:step:3", "my_tag"]), + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "3", + run_id: expect.any(String), + tags: expect.arrayContaining(["seq:step:3", "my_tag"]), + }, + { + data: { output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "3", + run_id: expect.any(String), + tags: expect.arrayContaining(["seq:step:3", "my_tag"]), + }, + ]); +}); + +test("Runnable streamEvents method with a runnable map", async () => { + const r = RunnableLambda.from(reverse); + + const chain = RunnableMap.from({ + reversed: r, + original: new RunnablePassthrough(), + }).pipe(new RunnablePick("reversed")); + + const events = []; + const eventStream = await chain.streamEvents("hello", { version: "v2" }); + for await (const event of eventStream) { + events.push(event); + } + expect(events).toEqual([ + { + run_id: expect.any(String), + event: "on_chain_start", + name: "RunnableSequence", + tags: [], + metadata: {}, + data: { input: "hello" }, + }, + { + event: "on_chain_start", + name: "RunnableMap", + run_id: expect.any(String), + tags: ["seq:step:1"], + metadata: {}, + data: {}, + }, + { + event: "on_chain_start", + name: "RunnableLambda", + run_id: expect.any(String), + tags: ["map:key:reversed"], + metadata: {}, + data: {}, + }, + { + event: "on_chain_start", + name: "RunnablePassthrough", + run_id: expect.any(String), + tags: ["map:key:original"], + metadata: {}, + data: {}, + }, + { + event: "on_chain_stream", + name: "RunnablePassthrough", + run_id: expect.any(String), + tags: ["map:key:original"], + metadata: {}, + data: { chunk: "hello" }, + }, + { + event: "on_chain_stream", + name: "RunnableLambda", + run_id: expect.any(String), + tags: ["map:key:reversed"], + metadata: {}, + data: { chunk: "olleh" }, + }, + { + event: "on_chain_stream", + name: "RunnableMap", + run_id: expect.any(String), + tags: ["seq:step:1"], + metadata: {}, + data: { + chunk: { + original: "hello", + }, + }, + }, + { + event: "on_chain_start", + name: "RunnablePick", + run_id: expect.any(String), + tags: ["seq:step:2"], + metadata: {}, + data: {}, + }, + { + event: "on_chain_stream", + name: "RunnableMap", + run_id: expect.any(String), + tags: ["seq:step:1"], + metadata: {}, + data: { + chunk: { + reversed: "olleh", + }, + }, + }, + { + event: "on_chain_end", + name: "RunnablePassthrough", + run_id: expect.any(String), + tags: ["map:key:original"], + metadata: {}, + data: { input: "hello", output: "hello" }, + }, + { + event: "on_chain_stream", + name: "RunnablePick", + run_id: expect.any(String), + tags: ["seq:step:2"], + metadata: {}, + data: { chunk: "olleh" }, + }, + { + event: "on_chain_stream", + run_id: expect.any(String), + tags: [], + metadata: {}, + name: "RunnableSequence", + data: { chunk: "olleh" }, + }, + { + event: "on_chain_end", + name: "RunnableLambda", + run_id: expect.any(String), + tags: ["map:key:reversed"], + metadata: {}, + data: { input: "hello", output: "olleh" }, + }, + { + event: "on_chain_end", + name: "RunnableMap", + run_id: expect.any(String), + tags: ["seq:step:1"], + metadata: {}, + data: { + input: "hello", + output: { + original: "hello", + reversed: "olleh", + }, + }, + }, + { + event: "on_chain_end", + name: "RunnablePick", + run_id: expect.any(String), + tags: ["seq:step:2"], + metadata: {}, + data: { + input: { + original: "hello", + reversed: "olleh", + }, + output: "olleh", + }, + }, + { + event: "on_chain_end", + name: "RunnableSequence", + run_id: expect.any(String), + tags: [], + metadata: {}, + data: { output: "olleh" }, + }, + ]); +}); + +test("Runnable streamEvents method with llm", async () => { + const model = new FakeStreamingLLM({ + responses: ["hey!"], + }).withConfig({ + metadata: { a: "b" }, + tags: ["my_model"], + runName: "my_model", + }); + const events = []; + const eventStream = await model.streamEvents("hello", { version: "v2" }); + for await (const event of eventStream) { + events.push(event); + } + expect(events).toEqual([ + { + event: "on_llm_start", + name: "my_model", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_model"]), + metadata: { + a: "b", + }, + data: { + input: "hello", + }, + }, + { + event: "on_llm_stream", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_model"]), + metadata: { + a: "b", + }, + name: "my_model", + data: { chunk: "h" }, + }, + + { + event: "on_llm_stream", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_model"]), + metadata: { + a: "b", + }, + name: "my_model", + data: { chunk: "e" }, + }, + { + event: "on_llm_stream", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_model"]), + metadata: { + a: "b", + }, + name: "my_model", + data: { chunk: "y" }, + }, + { + event: "on_llm_stream", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_model"]), + metadata: { + a: "b", + }, + name: "my_model", + data: { chunk: "!" }, + }, + { + event: "on_llm_end", + name: "my_model", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_model"]), + metadata: { + a: "b", + }, + data: { + output: { + generations: [ + [ + { + generationInfo: {}, + text: "hey!", + }, + ], + ], + llmOutput: {}, + }, + }, + }, + ]); +}); + +test("Runnable streamEvents method with chat model chain", async () => { + const template = ChatPromptTemplate.fromMessages([ + ["system", "You are Godzilla"], + ["human", "{question}"], + ]).withConfig({ + runName: "my_template", + tags: ["my_template"], + }); + const model = new FakeListChatModel({ + responses: ["ROAR"], + }).withConfig({ + metadata: { a: "b" }, + tags: ["my_model"], + runName: "my_model", + }); + const chain = template.pipe(model).withConfig({ + metadata: { foo: "bar" }, + tags: ["my_chain"], + runName: "my_chain", + }); + const events = []; + const eventStream = await chain.streamEvents( + { question: "hello" }, + { version: "v2" } + ); + for await (const event of eventStream) { + events.push(event); + } + expect(events).toEqual([ + { + run_id: expect.any(String), + event: "on_chain_start", + name: "my_chain", + tags: ["my_chain"], + metadata: { + foo: "bar", + }, + data: { + input: { + question: "hello", + }, + }, + }, + { + data: { input: { question: "hello" } }, + event: "on_prompt_start", + metadata: { foo: "bar" }, + name: "my_template", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_chain", "seq:step:1", "my_template"]), + }, + { + event: "on_prompt_end", + name: "my_template", + run_id: expect.any(String), + tags: expect.arrayContaining(["seq:step:1", "my_template", "my_chain"]), + metadata: { + foo: "bar", + }, + data: { + input: { + question: "hello", + }, + output: await template.invoke({ question: "hello" }), + }, + }, + { + event: "on_chat_model_start", + name: "my_model", + run_id: expect.any(String), + tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]), + metadata: { + foo: "bar", + a: "b", + ls_model_type: "chat", + ls_stop: undefined, + }, + data: { + input: { + messages: [ + [new SystemMessage("You are Godzilla"), new HumanMessage("hello")], + ], + }, + }, + }, + { + event: "on_chat_model_stream", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]), + metadata: { + a: "b", + foo: "bar", + ls_model_type: "chat", + ls_stop: undefined, + }, + name: "my_model", + data: { chunk: new AIMessageChunk("R") }, + }, + { + event: "on_chain_stream", + run_id: expect.any(String), + tags: ["my_chain"], + metadata: { + foo: "bar", + }, + name: "my_chain", + data: { chunk: new AIMessageChunk("R") }, + }, + { + event: "on_chat_model_stream", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]), + metadata: { + a: "b", + foo: "bar", + ls_model_type: "chat", + ls_stop: undefined, + }, + name: "my_model", + data: { chunk: new AIMessageChunk("O") }, + }, + { + event: "on_chain_stream", + run_id: expect.any(String), + tags: ["my_chain"], + metadata: { + foo: "bar", + }, + name: "my_chain", + data: { chunk: new AIMessageChunk("O") }, + }, + { + event: "on_chat_model_stream", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]), + metadata: { + a: "b", + foo: "bar", + ls_model_type: "chat", + ls_stop: undefined, + }, + name: "my_model", + data: { chunk: new AIMessageChunk("A") }, + }, + { + event: "on_chain_stream", + run_id: expect.any(String), + tags: ["my_chain"], + metadata: { + foo: "bar", + }, + name: "my_chain", + data: { chunk: new AIMessageChunk("A") }, + }, + { + event: "on_chat_model_stream", + run_id: expect.any(String), + tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]), + metadata: { + a: "b", + foo: "bar", + ls_model_type: "chat", + ls_stop: undefined, + }, + name: "my_model", + data: { chunk: new AIMessageChunk("R") }, + }, + { + event: "on_chain_stream", + run_id: expect.any(String), + tags: ["my_chain"], + metadata: { + foo: "bar", + }, + name: "my_chain", + data: { chunk: new AIMessageChunk("R") }, + }, + { + event: "on_chat_model_end", + name: "my_model", + run_id: expect.any(String), + tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]), + metadata: { + foo: "bar", + a: "b", + ls_model_type: "chat", + ls_stop: undefined, + }, + data: { + input: { + messages: [ + [new SystemMessage("You are Godzilla"), new HumanMessage("hello")], + ], + }, + output: new AIMessageChunk("ROAR"), + }, + }, + { + event: "on_chain_end", + name: "my_chain", + run_id: expect.any(String), + tags: ["my_chain"], + metadata: { + foo: "bar", + }, + data: { + output: new AIMessageChunk("ROAR"), + }, + }, + ]); +}); + +test("Runnable streamEvents method with simple tools", async () => { + const tool = new DynamicTool({ + func: async () => "hello", + name: "parameterless", + description: "A tool that does nothing", + }); + const events = []; + const eventStream = await tool.streamEvents({}, { version: "v2" }); + for await (const event of eventStream) { + events.push(event); + } + + expect(events).toEqual([ + { + data: { input: {} }, + event: "on_tool_start", + metadata: {}, + name: "parameterless", + run_id: expect.any(String), + tags: [], + }, + { + data: { output: "hello" }, + event: "on_tool_end", + metadata: {}, + name: "parameterless", + run_id: expect.any(String), + tags: [], + }, + ]); + + const toolWithParams = new DynamicStructuredTool({ + func: async (params: { x: number; y: string }) => + JSON.stringify({ x: params.x, y: params.y }), + schema: z.object({ + x: z.number(), + y: z.string(), + }), + name: "with_parameters", + description: "A tool that does nothing", + }); + const events2 = []; + const eventStream2 = await toolWithParams.streamEvents( + { x: 1, y: "2" }, + { version: "v2" } + ); + for await (const event of eventStream2) { + events2.push(event); + } + expect(events2).toEqual([ + { + data: { input: { x: 1, y: "2" } }, + event: "on_tool_start", + metadata: {}, + name: "with_parameters", + run_id: expect.any(String), + tags: [], + }, + { + data: { output: JSON.stringify({ x: 1, y: "2" }) }, + event: "on_tool_end", + metadata: {}, + name: "with_parameters", + run_id: expect.any(String), + tags: [], + }, + ]); +}); + +test("Runnable streamEvents method with a retriever", async () => { + const retriever = new FakeRetriever({ + output: [ + new Document({ pageContent: "hello world!", metadata: { foo: "bar" } }), + new Document({ + pageContent: "goodbye world!", + metadata: { food: "spare" }, + }), + ], + }); + const events = []; + const eventStream = await retriever.streamEvents("hello", { + version: "v2", + }); + for await (const event of eventStream) { + events.push(event); + } + expect(events).toEqual([ + { + data: { + input: "hello", + }, + event: "on_retriever_start", + metadata: {}, + name: "FakeRetriever", + run_id: expect.any(String), + tags: [], + }, + { + data: { + output: [ + new Document({ + pageContent: "hello world!", + metadata: { foo: "bar" }, + }), + new Document({ + pageContent: "goodbye world!", + metadata: { food: "spare" }, + }), + ], + }, + event: "on_retriever_end", + metadata: {}, + name: "FakeRetriever", + run_id: expect.any(String), + tags: [], + }, + ]); +}); + +test("Runnable streamEvents method with text/event-stream encoding", async () => { + const chain = RunnableLambda.from(reverse).withConfig({ + runName: "reverse", + }); + const events = []; + const eventStream = await chain.streamEvents("hello", { + version: "v2", + encoding: "text/event-stream", + runId: "1234", + }); + for await (const event of eventStream) { + events.push(event); + } + const decoder = new TextDecoder(); + expect(events.length).toEqual(4); + const dataEvents = events + .slice(0, 3) + .map((event) => decoder.decode(event).split("event: data\ndata: ")[1]); + const expectedPayloads = [ + { + data: { input: "hello" }, + event: "on_chain_start", + metadata: {}, + name: "reverse", + run_id: "1234", + tags: [], + }, + { + data: { chunk: "olleh" }, + event: "on_chain_stream", + metadata: {}, + name: "reverse", + run_id: "1234", + tags: [], + }, + { + data: { output: "olleh" }, + event: "on_chain_end", + metadata: {}, + name: "reverse", + run_id: "1234", + tags: [], + }, + ]; + for (let i = 0; i < dataEvents.length; i += 1) { + expect(dataEvents[i].endsWith("\n\n")).toBe(true); + expect(JSON.parse(dataEvents[i].replace("\n\n", ""))).toEqual( + expectedPayloads[i] + ); + } + + expect(decoder.decode(events[3])).toEqual("event: end\n\n"); +}); diff --git a/langchain-core/src/runnables/utils.ts b/langchain-core/src/runnables/utils.ts index 5ef3c8c6b273..2193fff6694d 100644 --- a/langchain-core/src/runnables/utils.ts +++ b/langchain-core/src/runnables/utils.ts @@ -1,4 +1,4 @@ -import { StreamEvent } from "../tracers/log_stream.js"; +import { StreamEvent } from "../tracers/event_stream.js"; import type { RunnableInterface } from "./types.js"; // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/langchain-core/src/tracers/event_stream.ts b/langchain-core/src/tracers/event_stream.ts new file mode 100644 index 000000000000..dd812633869b --- /dev/null +++ b/langchain-core/src/tracers/event_stream.ts @@ -0,0 +1,599 @@ +import { BaseTracer, type Run } from "./base.js"; +import { BaseCallbackHandlerInput } from "../callbacks/base.js"; +import { IterableReadableStream } from "../utils/stream.js"; +import { AIMessageChunk } from "../messages/ai.js"; +import { ChatGeneration, Generation, GenerationChunk } from "../outputs.js"; +import { BaseMessage } from "../messages/base.js"; + +/** + * Data associated with a StreamEvent. + */ +export type StreamEventData = { + /** + * The input passed to the runnable that generated the event. + * Inputs will sometimes be available at the *START* of the runnable, and + * sometimes at the *END* of the runnable. + * If a runnable is able to stream its inputs, then its input by definition + * won't be known until the *END* of the runnable when it has finished streaming + * its inputs. + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + input?: any; + + /** + * The output of the runnable that generated the event. + * Outputs will only be available at the *END* of the runnable. + * For most runnables, this field can be inferred from the `chunk` field, + * though there might be some exceptions for special cased runnables (e.g., like + * chat models), which may return more information. + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + output?: any; + + /** + * A streaming chunk from the output that generated the event. + * chunks support addition in general, and adding them up should result + * in the output of the runnable that generated the event. + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + chunk?: any; +}; + +/** + * A streaming event. + * + * Schema of a streaming event which is produced from the streamEvents method. + */ +export type StreamEvent = { + /** + * Event names are of the format: on_[runnable_type]_(start|stream|end). + * + * Runnable types are one of: + * - llm - used by non chat models + * - chat_model - used by chat models + * - prompt -- e.g., ChatPromptTemplate + * - tool -- LangChain tools + * - chain - most Runnables are of this type + * + * Further, the events are categorized as one of: + * - start - when the runnable starts + * - stream - when the runnable is streaming + * - end - when the runnable ends + * + * start, stream and end are associated with slightly different `data` payload. + * + * Please see the documentation for `EventData` for more details. + */ + event: string; + /** The name of the runnable that generated the event. */ + name: string; + /** + * An randomly generated ID to keep track of the execution of the given runnable. + * + * Each child runnable that gets invoked as part of the execution of a parent runnable + * is assigned its own unique ID. + */ + run_id: string; + /** + * Tags associated with the runnable that generated this event. + * Tags are always inherited from parent runnables. + */ + tags?: string[]; + /** Metadata associated with the runnable that generated this event. */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + metadata: Record; + /** + * Event data. + * + * The contents of the event data depend on the event type. + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + data: StreamEventData; +}; + +type RunInfo = { + name: string; + tags: string[]; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + metadata: Record; + runType: string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + inputs?: Record; +}; + +export interface EventStreamCallbackHandlerInput + extends BaseCallbackHandlerInput { + autoClose?: boolean; + includeNames?: string[]; + includeTypes?: string[]; + includeTags?: string[]; + excludeNames?: string[]; + excludeTypes?: string[]; + excludeTags?: string[]; +} + +function assignName({ + name, + serialized, +}: { + name?: string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + serialized?: Record; +}): string { + if (name !== undefined) { + return name; + } + if (serialized?.name !== undefined) { + return serialized.name; + } else if (serialized?.id !== undefined && Array.isArray(serialized?.id)) { + return serialized.id[serialized.id.length - 1]; + } + return "Unnamed"; +} +/** + * Class that extends the `BaseTracer` class from the + * `langchain.callbacks.tracers.base` module. It represents a callback + * handler that logs the execution of runs and emits `RunLog` instances to a + * `RunLogStream`. + */ +export class EventStreamCallbackHandler extends BaseTracer { + protected autoClose = true; + + protected includeNames?: string[]; + + protected includeTypes?: string[]; + + protected includeTags?: string[]; + + protected excludeNames?: string[]; + + protected excludeTypes?: string[]; + + protected excludeTags?: string[]; + + protected rootId?: string; + + private runInfoMap: Map = new Map(); + + private tappedPromises: Map> = new Map(); + + protected transformStream: TransformStream; + + public writer: WritableStreamDefaultWriter; + + public receiveStream: IterableReadableStream; + + name = "event_stream_tracer"; + + constructor(fields?: EventStreamCallbackHandlerInput) { + super({ _awaitHandler: true, ...fields }); + this.autoClose = fields?.autoClose ?? true; + this.includeNames = fields?.includeNames; + this.includeTypes = fields?.includeTypes; + this.includeTags = fields?.includeTags; + this.excludeNames = fields?.excludeNames; + this.excludeTypes = fields?.excludeTypes; + this.excludeTags = fields?.excludeTags; + this.transformStream = new TransformStream(); + this.writer = this.transformStream.writable.getWriter(); + this.receiveStream = IterableReadableStream.fromReadableStream( + this.transformStream.readable + ); + } + + [Symbol.asyncIterator]() { + return this.receiveStream; + } + + protected async persistRun(_run: Run): Promise { + // This is a legacy method only called once for an entire run tree + // and is therefore not useful here + } + + _includeRun(run: RunInfo): boolean { + const runTags = run.tags ?? []; + let include = + this.includeNames === undefined && + this.includeTags === undefined && + this.includeTypes === undefined; + if (this.includeNames !== undefined) { + include = include || this.includeNames.includes(run.name); + } + if (this.includeTypes !== undefined) { + include = include || this.includeTypes.includes(run.runType); + } + if (this.includeTags !== undefined) { + include = + include || + runTags.find((tag) => this.includeTags?.includes(tag)) !== undefined; + } + if (this.excludeNames !== undefined) { + include = include && !this.excludeNames.includes(run.name); + } + if (this.excludeTypes !== undefined) { + include = include && !this.excludeTypes.includes(run.runType); + } + if (this.excludeTags !== undefined) { + include = + include && runTags.every((tag) => !this.excludeTags?.includes(tag)); + } + return include; + } + + async *tapOutputIterable( + runId: string, + outputStream: AsyncGenerator + ): AsyncGenerator { + const firstChunk = await outputStream.next(); + if (firstChunk.done) { + return; + } + const runInfo = this.runInfoMap.get(runId); + // run has finished, don't issue any stream events + if (runInfo === undefined) { + yield firstChunk.value; + return; + } + let tappedPromise = this.tappedPromises.get(runId); + // if we are the first to tap, issue stream events + if (tappedPromise === undefined) { + let tappedPromiseResolver; + tappedPromise = new Promise((resolve) => { + tappedPromiseResolver = resolve; + }); + this.tappedPromises.set(runId, tappedPromise); + try { + const event: StreamEvent = { + event: `on_${runInfo.runType}_stream`, + run_id: runId, + name: runInfo.name, + tags: runInfo.tags, + metadata: runInfo.metadata, + data: {}, + }; + await this.send( + { + ...event, + data: { chunk: firstChunk.value }, + }, + runInfo + ); + yield firstChunk.value; + for await (const chunk of outputStream) { + // Don't yield tool and retriever stream events + if (runInfo.runType !== "tool" && runInfo.runType !== "retriever") { + await this.send( + { + ...event, + data: { + chunk, + }, + }, + runInfo + ); + } + yield chunk; + } + } finally { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + tappedPromiseResolver!(); + } + } else { + // otherwise just pass through + yield firstChunk.value; + for await (const chunk of outputStream) { + yield chunk; + } + } + } + + async send(payload: StreamEvent, run: RunInfo) { + if (this._includeRun(run)) { + await this.writer.write(payload); + } + } + + async sendEndEvent(payload: StreamEvent, run: RunInfo) { + const tappedPromise = this.tappedPromises.get(payload.run_id); + if (tappedPromise !== undefined) { + void tappedPromise.then(() => { + void this.send(payload, run); + }); + } else { + await this.send(payload, run); + } + } + + async onLLMStart(run: Run): Promise { + const runName = assignName(run); + const runType = run.inputs.messages !== undefined ? "chat_model" : "llm"; + const runInfo = { + tags: run.tags ?? [], + metadata: run.extra?.metadata ?? {}, + name: runName, + runType, + inputs: run.inputs, + }; + this.runInfoMap.set(run.id, runInfo); + const eventName = `on_${runType}_start`; + await this.send( + { + event: eventName, + data: { + input: run.inputs, + }, + name: runName, + tags: run.tags ?? [], + run_id: run.id, + metadata: run.extra?.metadata ?? {}, + }, + runInfo + ); + } + + async onLLMNewToken( + run: Run, + token: string, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + kwargs?: { chunk: any } + ): Promise { + const runInfo = this.runInfoMap.get(run.id); + let chunk; + let eventName; + if (runInfo === undefined) { + throw new Error(`onLLMNewToken: Run ID ${run.id} not found in run map.`); + } + if (runInfo.runType === "chat_model") { + eventName = "on_chat_model_stream"; + if (kwargs?.chunk === undefined) { + chunk = new AIMessageChunk({ content: token }); + } else { + chunk = kwargs.chunk.message; + } + } else if (runInfo.runType === "llm") { + eventName = "on_llm_stream"; + if (kwargs?.chunk === undefined) { + chunk = new GenerationChunk({ text: token }); + } else { + chunk = kwargs.chunk; + } + } else { + throw new Error(`Unexpected run type ${runInfo.runType}`); + } + await this.send( + { + event: eventName, + data: { + chunk, + }, + run_id: run.id, + name: runInfo.name, + tags: runInfo.tags, + metadata: runInfo.metadata, + }, + runInfo + ); + } + + async onLLMEnd(run: Run): Promise { + const runInfo = this.runInfoMap.get(run.id); + this.runInfoMap.delete(run.id); + let eventName: string; + if (runInfo === undefined) { + throw new Error(`onLLMEnd: Run ID ${run.id} not found in run map.`); + } + const generations: ChatGeneration[][] | Generation[][] | undefined = + run.outputs?.generations; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let output: BaseMessage | Record | undefined; + if (runInfo.runType === "chat_model") { + for (const generation of generations ?? []) { + if (output !== undefined) { + break; + } + output = (generation[0] as ChatGeneration | undefined)?.message; + } + eventName = "on_chat_model_end"; + } else if (runInfo.runType === "llm") { + output = { + generations: generations?.map((generation) => { + return generation.map((chunk) => { + return { + text: chunk.text, + generationInfo: chunk.generationInfo, + }; + }); + }), + llmOutput: run.outputs?.llmOutput ?? {}, + }; + eventName = "on_llm_end"; + } else { + throw new Error(`onLLMEnd: Unexpected run type: ${runInfo.runType}`); + } + await this.sendEndEvent( + { + event: eventName, + data: { + output, + input: runInfo.inputs, + }, + run_id: run.id, + name: runInfo.name, + tags: runInfo.tags, + metadata: runInfo.metadata, + }, + runInfo + ); + } + + async onChainStart(run: Run): Promise { + const runName = assignName(run); + const runType = run.run_type ?? "chain"; + const runInfo: RunInfo = { + tags: run.tags ?? [], + metadata: run.extra?.metadata ?? {}, + name: runName, + runType: run.run_type, + }; + let eventData: StreamEventData = {}; + // Workaround Runnable core code not sending input when transform streaming. + if (run.inputs.input === "" && Object.keys(run.inputs).length === 1) { + eventData = {}; + runInfo.inputs = {}; + } else if (run.inputs.input !== undefined) { + eventData.input = run.inputs.input; + runInfo.inputs = run.inputs.input; + } else { + eventData.input = run.inputs; + runInfo.inputs = run.inputs; + } + this.runInfoMap.set(run.id, runInfo); + await this.send( + { + event: `on_${runType}_start`, + data: eventData, + name: runName, + tags: run.tags ?? [], + run_id: run.id, + metadata: run.extra?.metadata ?? {}, + }, + runInfo + ); + } + + async onChainEnd(run: Run): Promise { + const runInfo = this.runInfoMap.get(run.id); + this.runInfoMap.delete(run.id); + if (runInfo === undefined) { + throw new Error(`onChainEnd: Run ID ${run.id} not found in run map.`); + } + const eventName = `on_${run.run_type}_end`; + const inputs = run.inputs ?? runInfo.inputs ?? {}; + const outputs = run.outputs?.output ?? run.outputs; + const data: StreamEventData = { + output: outputs, + input: inputs, + }; + if (inputs.input && Object.keys(inputs).length === 1) { + data.input = inputs.input; + runInfo.inputs = inputs.input; + } + await this.sendEndEvent( + { + event: eventName, + data, + run_id: run.id, + name: runInfo.name, + tags: runInfo.tags, + metadata: runInfo.metadata ?? {}, + }, + runInfo + ); + } + + async onToolStart(run: Run): Promise { + const runName = assignName(run); + const runInfo = { + tags: run.tags ?? [], + metadata: run.extra?.metadata ?? {}, + name: runName, + runType: "tool", + inputs: run.inputs ?? {}, + }; + this.runInfoMap.set(run.id, runInfo); + await this.send( + { + event: "on_tool_start", + data: { + input: run.inputs ?? {}, + }, + name: runName, + run_id: run.id, + tags: run.tags ?? [], + metadata: run.extra?.metadata ?? {}, + }, + runInfo + ); + } + + async onToolEnd(run: Run): Promise { + const runInfo = this.runInfoMap.get(run.id); + this.runInfoMap.delete(run.id); + if (runInfo === undefined) { + throw new Error(`onToolEnd: Run ID ${run.id} not found in run map.`); + } + if (runInfo.inputs === undefined) { + throw new Error( + `onToolEnd: Run ID ${run.id} is a tool call, and is expected to have traced inputs.` + ); + } + const output = + run.outputs?.output === undefined ? run.outputs : run.outputs.output; + await this.sendEndEvent( + { + event: "on_tool_end", + data: { + output, + input: runInfo.inputs, + }, + run_id: run.id, + name: runInfo.name, + tags: runInfo.tags, + metadata: runInfo.metadata, + }, + runInfo + ); + } + + async onRetrieverStart(run: Run): Promise { + const runName = assignName(run); + const runType = "retriever"; + const runInfo = { + tags: run.tags ?? [], + metadata: run.extra?.metadata ?? {}, + name: runName, + runType, + inputs: { + query: run.inputs.query, + }, + }; + this.runInfoMap.set(run.id, runInfo); + await this.send( + { + event: "on_retriever_start", + data: { + input: { + query: run.inputs.query, + }, + }, + name: runName, + tags: run.tags ?? [], + run_id: run.id, + metadata: run.extra?.metadata ?? {}, + }, + runInfo + ); + } + + async onRetrieverEnd(run: Run): Promise { + const runInfo = this.runInfoMap.get(run.id); + this.runInfoMap.delete(run.id); + if (runInfo === undefined) { + throw new Error(`onRetrieverEnd: Run ID ${run.id} not found in run map.`); + } + await this.sendEndEvent( + { + event: "on_retriever_end", + data: { + output: run.outputs?.documents ?? run.outputs, + input: runInfo.inputs, + }, + run_id: run.id, + name: runInfo.name, + tags: runInfo.tags, + metadata: runInfo.metadata, + }, + runInfo + ); + } +} diff --git a/langchain-core/src/tracers/log_stream.ts b/langchain-core/src/tracers/log_stream.ts index c62707e087e6..cc4f2a1cc301 100644 --- a/langchain-core/src/tracers/log_stream.ts +++ b/langchain-core/src/tracers/log_stream.ts @@ -10,6 +10,9 @@ import { import { IterableReadableStream } from "../utils/stream.js"; import { ChatGenerationChunk, GenerationChunk } from "../outputs.js"; import { AIMessageChunk } from "../messages/index.js"; +import type { StreamEvent, StreamEventData } from "./event_stream.js"; + +export type { StreamEvent, StreamEventData }; /** * Interface that represents the structure of a log entry in the @@ -113,92 +116,6 @@ export class RunLog extends RunLogPatch { } } -/** - * Data associated with a StreamEvent. - */ -export type StreamEventData = { - /** - * The input passed to the runnable that generated the event. - * Inputs will sometimes be available at the *START* of the runnable, and - * sometimes at the *END* of the runnable. - * If a runnable is able to stream its inputs, then its input by definition - * won't be known until the *END* of the runnable when it has finished streaming - * its inputs. - */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - input?: any; - - /** - * The output of the runnable that generated the event. - * Outputs will only be available at the *END* of the runnable. - * For most runnables, this field can be inferred from the `chunk` field, - * though there might be some exceptions for special cased runnables (e.g., like - * chat models), which may return more information. - */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - output?: any; - - /** - * A streaming chunk from the output that generated the event. - * chunks support addition in general, and adding them up should result - * in the output of the runnable that generated the event. - */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - chunk?: any; -}; - -/** - * A streaming event. - * - * Schema of a streaming event which is produced from the streamEvents method. - */ -export type StreamEvent = { - /** - * Event names are of the format: on_[runnable_type]_(start|stream|end). - * - * Runnable types are one of: - * - llm - used by non chat models - * - chat_model - used by chat models - * - prompt -- e.g., ChatPromptTemplate - * - tool -- from tools defined via @tool decorator or inheriting from Tool/BaseTool - * - chain - most Runnables are of this type - * - * Further, the events are categorized as one of: - * - start - when the runnable starts - * - stream - when the runnable is streaming - * - end - when the runnable ends - * - * start, stream and end are associated with slightly different `data` payload. - * - * Please see the documentation for `EventData` for more details. - */ - event: string; - /** The name of the runnable that generated the event. */ - name: string; - /** - * An randomly generated ID to keep track of the execution of the given runnable. - * - * Each child runnable that gets invoked as part of the execution of a parent runnable - * is assigned its own unique ID. - */ - run_id: string; - /** - * Tags associated with the runnable that generated this event. - * Tags are always inherited from parent runnables. - */ - tags?: string[]; - /** Metadata associated with the runnable that generated this event. */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - metadata: Record; - /** - * Event data. - * - * The contents of the event data depend on the event type. - */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - data: StreamEventData; -}; - export type SchemaFormat = "original" | "streaming_events"; export interface LogStreamCallbackHandlerInput