Skip to content

Commit

Permalink
core[patch]: Fix remote runnable streamLog type, update docs (#4353)
Browse files Browse the repository at this point in the history
* Fix remote runnable streamLog type, update docs

* Fix lint

* Remove bad test
  • Loading branch information
jacoblee93 authored Feb 9, 2024
1 parent 9a47e4e commit 1b1ea2f
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 63 deletions.
10 changes: 4 additions & 6 deletions docs/core_docs/docs/ecosystem/langserve.mdx
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# LangServe
# Integrating with LangServe

[LangServe](https://python.langchain.com/docs/langserve) is a Python framework that helps developers deploy LangChain [runnables and chains](/docs/expression_language/)
as REST APIs.

If you have a deployed LangServe route, you can use the [RemoteRunnable](https://api.js.langchain.com/classes/langchain_runnables_remote.RemoteRunnable.html) class to interact
with it as if it were a local chain. This allows you to more easily call hosted LangServe instances from JavaScript environments (like in the browser on the frontend).

You'll need to install or package LangChain into your frontend:
You'll need to install or package LangChain core into your frontend:

```bash npm2yarn
npm install langchain
npm install @langchain/core
```

## Usage
Expand All @@ -23,9 +23,7 @@ import Example from "@examples/ecosystem/langsmith.ts";
<CodeBlock language="typescript">{Example}</CodeBlock>

[`streamLog`](/docs/expression_language/interface) is a lower level method for streaming chain intermediate steps as partial JSONPatch chunks.
This method allows for a few extra options as well to only include or exclude certain named steps.

`@langchain/core` also provides an `applyPatch` utility for aggregating these chunks into a full output:
This method allows for a few extra options as well to only include or exclude certain named steps:

import StreamLogExample from "@examples/ecosystem/langsmith_stream_log.ts";

Expand Down
2 changes: 1 addition & 1 deletion examples/src/ecosystem/langsmith.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { RemoteRunnable } from "langchain/runnables/remote";
import { RemoteRunnable } from "@langchain/core/runnables/remote";

const remoteChain = new RemoteRunnable({
url: "https://your_hostname.com/path",
Expand Down
119 changes: 103 additions & 16 deletions examples/src/ecosystem/langsmith_stream_log.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,127 @@
import { RemoteRunnable } from "langchain/runnables/remote";
import { applyPatch } from "@langchain/core/utils/json_patch";
import { RemoteRunnable } from "@langchain/core/runnables/remote";

const remoteChain = new RemoteRunnable({
url: "https://your_hostname.com/path",
// url: "https://your_hostname.com/path",
url: "https://chat-langchain-backend.langchain.dev/chat",
});

const logStream = await remoteChain.streamLog(
{
param1: "param1",
param2: "param2",
question: "What is a document loader?",
},
// LangChain runnable config properties
{
configurable: {
llm: "some_property",
llm: "openai_gpt_3_5_turbo",
},
metadata: {
conversation_id: "other_metadata",
},
},
// Optional additional streamLog properties for filtering outputs
{
includeNames: [],
includeTags: [],
includeTypes: [],
excludeNames: [],
excludeTags: [],
excludeTypes: [],
// includeNames: [],
// includeTags: [],
// includeTypes: [],
// excludeNames: [],
// excludeTags: [],
// excludeTypes: [],
}
);

let streamedResponse: Record<string, any> = {};
let currentState;

for await (const chunk of logStream) {
console.log(chunk);
streamedResponse = applyPatch(streamedResponse, chunk.ops).newDocument;
if (!currentState) {
currentState = chunk;
} else {
currentState = currentState.concat(chunk);
}
}

console.log(streamedResponse);
console.log(currentState);

/*
RunLog {
ops: [
{ op: 'replace', path: '', value: [Object] },
{
op: 'add',
path: '/logs/RunnableParallel<question,chat_history>',
value: [Object]
},
{ op: 'add', path: '/logs/Itemgetter:question', value: [Object] },
{ op: 'add', path: '/logs/SerializeHistory', value: [Object] },
{
op: 'add',
path: '/logs/Itemgetter:question/streamed_output/-',
value: 'What is a document loader?'
},
{
op: 'add',
path: '/logs/SerializeHistory/streamed_output/-',
value: []
},
{
op: 'add',
path: '/logs/RunnableParallel<question,chat_history>/streamed_output/-',
value: [Object]
},
{ op: 'add', path: '/logs/RetrieveDocs', value: [Object] },
{ op: 'add', path: '/logs/RunnableSequence', value: [Object] },
{
op: 'add',
path: '/logs/RunnableParallel<question,chat_history>/streamed_output/-',
value: [Object]
},
... 558 more items
],
state: {
id: '415ba646-a3e0-4c76-bff6-4f5f34305244',
streamed_output: [
'', 'A', ' document', ' loader', ' is',
' a', ' tool', ' used', ' to', ' load',
' data', ' from', ' a', ' source', ' as',
' `', 'Document', '`', "'", 's',
',', ' which', ' are', ' pieces', ' of',
' text', ' with', ' associated', ' metadata', '.',
' It', ' can', ' load', ' data', ' from',
' various', ' sources', ',', ' such', ' as',
' a', ' simple', ' `.', 'txt', '`',
' file', ',', ' the', ' text', ' contents',
' of', ' a', ' web', ' page', ',',
' or', ' a', ' transcript', ' of', ' a',
' YouTube', ' video', '.', ' Document', ' loaders',
' provide', ' a', ' "', 'load', '"',
' method', ' for', ' loading', ' data', ' as',
' documents', ' from', ' a', ' configured', ' source',
' and', ' can', ' optionally', ' implement', ' a',
' "', 'lazy', ' load', '"', ' for',
' loading', ' data', ' into', ' memory', '.',
' [', '1', ']', ''
],
final_output: 'A document loader is a tool used to load data from a source as `Document`\'s, which are pieces of text with associated metadata. It can load data from various sources, such as a simple `.txt` file, the text contents of a web page, or a transcript of a YouTube video. Document loaders provide a "load" method for loading data as documents from a configured source and can optionally implement a "lazy load" for loading data into memory. [1]',
logs: {
'RunnableParallel<question,chat_history>': [Object],
'Itemgetter:question': [Object],
SerializeHistory: [Object],
RetrieveDocs: [Object],
RunnableSequence: [Object],
RunnableLambda: [Object],
'RunnableLambda:2': [Object],
FindDocs: [Object],
HasChatHistoryCheck: [Object],
GenerateResponse: [Object],
RetrievalChainWithNoHistory: [Object],
'Itemgetter:question:2': [Object],
Retriever: [Object],
format_docs: [Object],
ChatPromptTemplate: [Object],
ChatOpenAI: [Object],
StrOutputParser: [Object]
},
name: '/chat',
type: 'chain'
}
}
*/
22 changes: 12 additions & 10 deletions examples/src/guides/expression_language/interface_stream_log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
HumanMessagePromptTemplate,
SystemMessagePromptTemplate,
} from "@langchain/core/prompts";
import { applyPatch } from "@langchain/core/utils/json_patch";

// Initialize the LLM to use to answer the question.
const model = new ChatOpenAI({});
Expand Down Expand Up @@ -49,18 +48,21 @@ const chain = RunnableSequence.from([
new StringOutputParser(),
]);

const stream = await chain.streamLog("What is the powerhouse of the cell?");
const logStream = await chain.streamLog("What is the powerhouse of the cell?");

// eslint-disable-next-line @typescript-eslint/no-explicit-any
let aggregate: any = {};
let state;

for await (const chunk of stream) {
console.log(JSON.stringify(chunk));
// You can also reconstruct the log using the `applyPatch` function.
aggregate = applyPatch(aggregate, chunk.ops).newDocument;
console.log();
for await (const logPatch of logStream) {
console.log(JSON.stringify(logPatch));
if (!state) {
state = logPatch;
} else {
state = state.concat(logPatch);
}
}
console.log("aggregate", aggregate);

console.log("aggregate", state);

/*
{"ops":[{"op":"replace","path":"","value":{"id":"5a79d2e7-171a-4034-9faa-63af88e5a451","streamed_output":[],"logs":{}}}]}
Expand Down
5 changes: 3 additions & 2 deletions langchain-core/src/runnables/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { CallbackManagerForChainRun } from "../callbacks/manager.js";
import { ChatPromptValue, StringPromptValue } from "../prompt_values.js";
import {
LogStreamCallbackHandler,
RunLogPatch,
type LogStreamCallbackHandlerInput,
type RunLogPatch,
} from "../tracers/log_stream.js";
import {
AIMessage,
Expand Down Expand Up @@ -485,7 +485,8 @@ export class RemoteRunnable<
}
const runnableStream = convertEventStreamToIterableReadableDataStream(body);
for await (const log of runnableStream) {
yield revive(JSON.parse(log));
const chunk = revive(JSON.parse(log));
yield new RunLogPatch({ ops: chunk.ops });
}
}
}
24 changes: 24 additions & 0 deletions langchain-core/src/runnables/tests/runnable_remote.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,30 @@ test("streamLog hosted langserve", async () => {
console.log("totalByteSize", totalByteSize);
});

test("streamLog hosted langserve with concat syntax", async () => {
const remote = new RemoteRunnable({
url: `https://chat-langchain-backend.langchain.dev/chat`,
});
const result = await remote.streamLog({
question: "What is a document loader?",
});
let totalByteSize = 0;
let state;

for await (const chunk of result) {
if (!state) {
state = chunk;
} else {
state = state.concat(chunk);
}
const jsonString = JSON.stringify(chunk);
const byteSize = Buffer.byteLength(jsonString, "utf-8");
totalByteSize += byteSize;
}
console.log("final state", state);
console.log("totalByteSize", totalByteSize);
});

test("streamLog with raw messages", async () => {
const chain = new RemoteRunnable({
url: "https://aimor-deployment-bf1e4ebc87365334b3b8a6b175fb4151-ffoprvkqsa-uc.a.run.app/",
Expand Down
14 changes: 0 additions & 14 deletions langchain-core/src/runnables/tests/runnable_remote.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ describe("RemoteRunnable", () => {
}),
"/a/stream": respToStream(aResp),
"/b/stream": respToStream(bResp),
"/a/stream_log": respToStream(aResp),
"/strange_types/stream": respToStream(strangeTypesResp),
};

Expand Down Expand Up @@ -199,19 +198,6 @@ describe("RemoteRunnable", () => {
);
});

test("Stream log local langserve", async () => {
const remote = new RemoteRunnable({ url: `${BASE_URL}/a` });
const stream = await remote.streamLog({
question: "What are the 5 best apples?",
});
let chunkCount = 0;
for await (const chunk of stream) {
expect(chunk).toEqual(["a", "b", "c", "d"]);
chunkCount += 1;
}
expect(chunkCount).toBe(1);
});

test("Stream legacy data type formats", async () => {
const remote = new RemoteRunnable({ url: `${BASE_URL}/strange_types` });
const stream = await remote.stream({ text: "What are the 5 best apples?" });
Expand Down
14 changes: 0 additions & 14 deletions langchain/src/runnables/tests/runnable_remote.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ describe("RemoteRunnable", () => {
}),
"/a/stream": respToStream(aResp),
"/b/stream": respToStream(bResp),
"/a/stream_log": respToStream(aResp),
"/strange_types/stream": respToStream(strangeTypesResp),
};

Expand Down Expand Up @@ -199,19 +198,6 @@ describe("RemoteRunnable", () => {
);
});

test("Stream log local langserve", async () => {
const remote = new RemoteRunnable({ url: `${BASE_URL}/a` });
const stream = await remote.streamLog({
question: "What are the 5 best apples?",
});
let chunkCount = 0;
for await (const chunk of stream) {
expect(chunk).toEqual(["a", "b", "c", "d"]);
chunkCount += 1;
}
expect(chunkCount).toBe(1);
});

test("Stream legacy data type formats", async () => {
const remote = new RemoteRunnable({ url: `${BASE_URL}/strange_types` });
const stream = await remote.stream({ text: "What are the 5 best apples?" });
Expand Down

0 comments on commit 1b1ea2f

Please sign in to comment.