Skip to content

Commit

Permalink
core[patch]: Allow any module to emit a custom event (#6282)
Browse files Browse the repository at this point in the history
* Allow any module to emit a custom event

* Remove focus
  • Loading branch information
jacoblee93 authored Aug 1, 2024
1 parent d75537d commit 41a653f
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 2 deletions.
36 changes: 36 additions & 0 deletions langchain-core/src/callbacks/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,42 @@ export class BaseRunManager {
)
);
}

async handleCustomEvent(
eventName: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
data: any,
_runId?: string,
_tags?: string[],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
_metadata?: Record<string, any>
): Promise<void> {
await Promise.all(
this.handlers.map((handler) =>
consumeCallback(async () => {
try {
await handler.handleCustomEvent?.(
eventName,
data,
this.runId,
this.tags,
this.metadata
);
} catch (err) {
const logFunction = handler.raiseError
? console.error
: console.warn;
logFunction(
`Error in handler ${handler.constructor.name}, handleCustomEvent: ${err}`
);
if (handler.raiseError) {
throw err;
}
}
}, handler.awaitHandlers)
)
);
}
}

/**
Expand Down
37 changes: 37 additions & 0 deletions langchain-core/src/language_models/tests/chat_models.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,40 @@ test("Test ChatModel can cache complex messages", async () => {
const cachedMsg = value[0].message as AIMessage;
expect(cachedMsg.content).toEqual(JSON.stringify(contentToCache, null, 2));
});

test("Test ChatModel can emit a custom event", async () => {
const model = new FakeListChatModel({
responses: ["hi"],
emitCustomEvent: true,
});
let customEvent;
const response = await model.invoke([["human", "Hello there!"]], {
callbacks: [
{
handleCustomEvent(_, data) {
customEvent = data;
},
},
],
});
await new Promise((resolve) => setTimeout(resolve, 100));
expect(response.content).toEqual("hi");
expect(customEvent).toBeDefined();
});

test("Test ChatModel can stream back a custom event", async () => {
const model = new FakeListChatModel({
responses: ["hi"],
emitCustomEvent: true,
});
let customEvent;
const eventStream = await model.streamEvents([["human", "Hello there!"]], {
version: "v2",
});
for await (const event of eventStream) {
if (event.event === "on_custom_event") {
customEvent = event;
}
}
expect(customEvent).toBeDefined();
});
20 changes: 18 additions & 2 deletions langchain-core/src/utils/testing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ export interface FakeChatInput extends BaseChatModelParams {

/** Time to sleep in milliseconds between responses */
sleep?: number;

emitCustomEvent?: boolean;
}

/**
Expand Down Expand Up @@ -353,10 +355,13 @@ export class FakeListChatModel extends BaseChatModel {

sleep?: number;

constructor({ responses, sleep }: FakeChatInput) {
emitCustomEvent = false;

constructor({ responses, sleep, emitCustomEvent }: FakeChatInput) {
super({});
this.responses = responses;
this.sleep = sleep;
this.emitCustomEvent = emitCustomEvent ?? this.emitCustomEvent;
}

_combineLLMOutput() {
Expand All @@ -369,9 +374,15 @@ export class FakeListChatModel extends BaseChatModel {

async _generate(
_messages: BaseMessage[],
options?: this["ParsedCallOptions"]
options?: this["ParsedCallOptions"],
runManager?: CallbackManagerForLLMRun
): Promise<ChatResult> {
await this._sleepIfRequested();
if (this.emitCustomEvent) {
await runManager?.handleCustomEvent("some_test_event", {
someval: true,
});
}

if (options?.stop?.length) {
return {
Expand Down Expand Up @@ -402,6 +413,11 @@ export class FakeListChatModel extends BaseChatModel {
): AsyncGenerator<ChatGenerationChunk> {
const response = this._currentResponse();
this._incrementResponse();
if (this.emitCustomEvent) {
await runManager?.handleCustomEvent("some_test_event", {
someval: true,
});
}

for await (const text of response) {
await this._sleepIfRequested();
Expand Down

0 comments on commit 41a653f

Please sign in to comment.