Skip to content

Commit

Permalink
Allow full backgrounding of auto batches
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoblee93 committed Oct 7, 2024
1 parent da3c1bb commit 0b745e0
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 1 deletion.
11 changes: 10 additions & 1 deletion js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export interface ClientConfig {
hideOutputs?: boolean | ((outputs: KVMap) => KVMap);
autoBatchTracing?: boolean;
pendingAutoBatchedRunLimit?: number;
blockOnRootRunUpdates?: boolean;
fetchOptions?: RequestInit;
}

Expand Down Expand Up @@ -434,6 +435,8 @@ export class Client {

private settings: Promise<LangSmithSettings> | null;

private blockOnRootRunUpdates = true;

constructor(config: ClientConfig = {}) {
const defaultConfig = Client.getDefaultClientConfig();

Expand All @@ -460,6 +463,8 @@ export class Client {
config.hideOutputs ?? config.anonymizer ?? defaultConfig.hideOutputs;

this.autoBatchTracing = config.autoBatchTracing ?? this.autoBatchTracing;
this.blockOnRootRunUpdates =
config.blockOnRootRunUpdates ?? this.blockOnRootRunUpdates;
this.pendingAutoBatchedRunLimit =
config.pendingAutoBatchedRunLimit ?? this.pendingAutoBatchedRunLimit;
this.fetchOptions = config.fetchOptions || {};
Expand Down Expand Up @@ -966,7 +971,11 @@ export class Client {
data.trace_id !== undefined &&
data.dotted_order !== undefined
) {
if (run.end_time !== undefined && data.parent_run_id === undefined) {
if (
run.end_time !== undefined &&
data.parent_run_id === undefined &&
this.blockOnRootRunUpdates
) {
// Trigger a batch as soon as a root trace ends and block to ensure trace finishes
// in serverless environments.
await this.processRunOperation({ action: "update", item: data }, true);
Expand Down
108 changes: 108 additions & 0 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,114 @@ describe("Batch client tracing", () => {
});
});

it("should not trigger a batch on root run end and instead batch call with previous batch if blockOnRootRunUpdates is false", async () => {
const client = new Client({
apiKey: "test-api-key",
autoBatchTracing: true,
blockOnRootRunUpdates: false,
});
const callSpy = jest
.spyOn((client as any).batchIngestCaller, "call")
.mockResolvedValue({
ok: true,
text: () => "",
});
jest
.spyOn(client as any, "batchEndpointIsSupported")
.mockResolvedValue(true);
const projectName = "__test_batch";

const runId = uuidv4();
const dottedOrder = convertToDottedOrderFormat(
new Date().getTime() / 1000,
runId
);
await client.createRun({
id: runId,
project_name: projectName,
name: "test_run",
run_type: "llm",
inputs: { text: "hello world" },
trace_id: runId,
dotted_order: dottedOrder,
});

// Wait for first batch to send
await new Promise((resolve) => setTimeout(resolve, 300));

const endTime = Math.floor(new Date().getTime() / 1000);

// A root run finishing triggers the second batch
await client.updateRun(runId, {
outputs: { output: ["Hi"] },
dotted_order: dottedOrder,
trace_id: runId,
end_time: endTime,
});

const runId2 = uuidv4();
const dottedOrder2 = convertToDottedOrderFormat(
new Date().getTime() / 1000,
runId2
);

// Will send in a third batch, even though it's triggered around the same time as the update
await client.createRun({
id: runId2,
project_name: projectName,
name: "test_run",
run_type: "llm",
inputs: { text: "hello world 2" },
trace_id: runId2,
dotted_order: dottedOrder2,
});

await new Promise((resolve) => setTimeout(resolve, 300));

expect(callSpy.mock.calls.length).toEqual(2);
const calledRequestParam: any = callSpy.mock.calls[0][2];
const calledRequestParam2: any = callSpy.mock.calls[1][2];
expect(JSON.parse(calledRequestParam?.body)).toEqual({
post: [
expect.objectContaining({
id: runId,
run_type: "llm",
inputs: {
text: "hello world",
},
trace_id: runId,
dotted_order: dottedOrder,
}),
],
patch: [],
});

expect(JSON.parse(calledRequestParam2?.body)).toEqual({
post: [
expect.objectContaining({
id: runId2,
run_type: "llm",
inputs: {
text: "hello world 2",
},
trace_id: runId2,
dotted_order: dottedOrder2,
}),
],
patch: [
expect.objectContaining({
id: runId,
dotted_order: dottedOrder,
trace_id: runId,
end_time: endTime,
outputs: {
output: ["Hi"],
},
}),
],
});
});

it("should send traces above the batch size and see even batches", async () => {
const client = new Client({
apiKey: "test-api-key",
Expand Down

0 comments on commit 0b745e0

Please sign in to comment.