Skip to content

fix metadata system not updating because of dual package hazard #1428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/cli-v3/src/entryPoints/deploy-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
TracingSDK,
usage,
UsageTimeoutManager,
StandardMetadataManager,
} from "@trigger.dev/core/v3/workers";
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
import { readFile } from "node:fs/promises";
Expand Down Expand Up @@ -99,6 +100,8 @@ timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
taskCatalog.setGlobalTaskCatalog(new StandardTaskCatalog());
const durableClock = new DurableClock();
clock.setGlobalClock(durableClock);
const runMetadataManager = new StandardMetadataManager();
runMetadata.setGlobalManager(runMetadataManager);

const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL");

Expand Down Expand Up @@ -305,7 +308,7 @@ const zodIpc = new ZodIpcConnection({
_execution = execution;
_isRunning = true;

runMetadata.startPeriodicFlush(
runMetadataManager.startPeriodicFlush(
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);

Expand Down Expand Up @@ -437,7 +440,7 @@ async function flushTracingSDK(timeoutInMs: number = 10_000) {
async function flushMetadata(timeoutInMs: number = 10_000) {
const now = performance.now();

await Promise.race([runMetadata.flush(), setTimeout(timeoutInMs)]);
await Promise.race([runMetadataManager.flush(), setTimeout(timeoutInMs)]);

const duration = performance.now() - now;

Expand Down
7 changes: 5 additions & 2 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
TracingSDK,
usage,
getNumberEnvVar,
StandardMetadataManager,
} from "@trigger.dev/core/v3/workers";
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
import { readFile } from "node:fs/promises";
Expand Down Expand Up @@ -81,6 +82,8 @@ usage.setGlobalUsageManager(devUsageManager);
const devRuntimeManager = new DevRuntimeManager();
runtime.setGlobalRuntimeManager(devRuntimeManager);
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
const runMetadataManager = new StandardMetadataManager();
runMetadata.setGlobalManager(runMetadataManager);

const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL");

Expand Down Expand Up @@ -275,7 +278,7 @@ const zodIpc = new ZodIpcConnection({
_execution = execution;
_isRunning = true;

runMetadata.startPeriodicFlush(
runMetadataManager.startPeriodicFlush(
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);
const measurement = usage.start();
Expand Down Expand Up @@ -350,7 +353,7 @@ const zodIpc = new ZodIpcConnection({
}
},
FLUSH: async ({ timeoutInMs }, sender) => {
await Promise.allSettled([_tracingSDK?.flush(), runMetadata.flush()]);
await Promise.allSettled([_tracingSDK?.flush(), runMetadataManager.flush()]);
},
},
});
Expand Down
135 changes: 17 additions & 118 deletions packages/core/src/v3/runMetadata/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import { dequal } from "dequal/lite";
import { DeserializedJson } from "../../schemas/json.js";
import { apiClientManager } from "../apiClientManager-api.js";
import { taskContext } from "../task-context-api.js";
import { getGlobal, registerGlobal } from "../utils/globals.js";
import { ApiRequestOptions } from "../zodfetch.js";
import { JSONHeroPath } from "@jsonhero/path";
import { NoopRunMetadataManager } from "./noopManager.js";
import { RunMetadataManager } from "./types.js";

const API_NAME = "run-metadata";

export class RunMetadataAPI {
const NOOP_MANAGER = new NoopRunMetadataManager();

export class RunMetadataAPI implements RunMetadataManager {
private static _instance?: RunMetadataAPI;
private flushTimeoutId: NodeJS.Timeout | null = null;
private hasChanges: boolean = false;

private constructor() {}

Expand All @@ -23,138 +21,39 @@ export class RunMetadataAPI {
return this._instance;
}

get store(): Record<string, DeserializedJson> | undefined {
return getGlobal(API_NAME);
setGlobalManager(manager: RunMetadataManager): boolean {
return registerGlobal(API_NAME, manager);
}

set store(value: Record<string, DeserializedJson> | undefined) {
registerGlobal(API_NAME, value, true);
#getManager(): RunMetadataManager {
return getGlobal(API_NAME) ?? NOOP_MANAGER;
}

public enterWithMetadata(metadata: Record<string, DeserializedJson>): void {
registerGlobal(API_NAME, metadata);
this.#getManager().enterWithMetadata(metadata);
}

public current(): Record<string, DeserializedJson> | undefined {
return this.store;
return this.#getManager().current();
}

public getKey(key: string): DeserializedJson | undefined {
return this.store?.[key];
return this.#getManager().getKey(key);
}

public setKey(key: string, value: DeserializedJson) {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

let nextStore: Record<string, DeserializedJson> | undefined = this.store
? structuredClone(this.store)
: undefined;

if (key.startsWith("$.")) {
const path = new JSONHeroPath(key);
path.set(nextStore, value);
} else {
nextStore = {
...(nextStore ?? {}),
[key]: value,
};
}

if (!nextStore) {
return;
}

if (!dequal(this.store, nextStore)) {
this.hasChanges = true;
}

this.store = nextStore;
return this.#getManager().setKey(key, value);
}

public deleteKey(key: string) {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

const nextStore = { ...(this.store ?? {}) };
delete nextStore[key];

if (!dequal(this.store, nextStore)) {
this.hasChanges = true;
}

this.store = nextStore;
return this.#getManager().deleteKey(key);
}

public update(metadata: Record<string, DeserializedJson>): void {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

if (!dequal(this.store, metadata)) {
this.hasChanges = true;
}

this.store = metadata;
return this.#getManager().update(metadata);
}

public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

if (!this.store) {
return;
}

if (!this.hasChanges) {
return;
}

const apiClient = apiClientManager.clientOrThrow();

try {
this.hasChanges = false;
await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions);
} catch (error) {
this.hasChanges = true;
throw error;
}
}

public startPeriodicFlush(intervalMs: number = 1000) {
const periodicFlush = async (intervalMs: number) => {
try {
await this.flush();
} catch (error) {
console.error("Failed to flush metadata", error);
throw error;
} finally {
scheduleNext();
}
};

const scheduleNext = () => {
this.flushTimeoutId = setTimeout(() => periodicFlush(intervalMs), intervalMs);
};

scheduleNext();
}

stopPeriodicFlush(): void {
if (this.flushTimeoutId) {
clearTimeout(this.flushTimeoutId);
this.flushTimeoutId = null;
}
flush(requestOptions?: ApiRequestOptions): Promise<void> {
return this.#getManager().flush(requestOptions);
}
}
140 changes: 140 additions & 0 deletions packages/core/src/v3/runMetadata/manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { JSONHeroPath } from "@jsonhero/path";
import { dequal } from "dequal/lite";
import { DeserializedJson } from "../../schemas/json.js";
import { apiClientManager } from "../apiClientManager-api.js";
import { taskContext } from "../task-context-api.js";
import { ApiRequestOptions } from "../zodfetch.js";
import { RunMetadataManager } from "./types.js";

export class StandardMetadataManager implements RunMetadataManager {
private flushTimeoutId: NodeJS.Timeout | null = null;
private hasChanges: boolean = false;
private store: Record<string, DeserializedJson> | undefined;

public enterWithMetadata(metadata: Record<string, DeserializedJson>): void {
this.store = metadata ?? {};
}

public current(): Record<string, DeserializedJson> | undefined {
return this.store;
}

public getKey(key: string): DeserializedJson | undefined {
return this.store?.[key];
}

public setKey(key: string, value: DeserializedJson) {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

let nextStore: Record<string, DeserializedJson> | undefined = this.store
? structuredClone(this.store)
: undefined;

if (key.startsWith("$.")) {
const path = new JSONHeroPath(key);
path.set(nextStore, value);
} else {
nextStore = {
...(nextStore ?? {}),
[key]: value,
};
}

if (!nextStore) {
return;
}

if (!dequal(this.store, nextStore)) {
this.hasChanges = true;
}

this.store = nextStore;
}

public deleteKey(key: string) {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

const nextStore = { ...(this.store ?? {}) };
delete nextStore[key];

if (!dequal(this.store, nextStore)) {
this.hasChanges = true;
}

this.store = nextStore;
}

public update(metadata: Record<string, DeserializedJson>): void {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

if (!dequal(this.store, metadata)) {
this.hasChanges = true;
}

this.store = metadata;
}

public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

if (!this.store) {
return;
}

if (!this.hasChanges) {
return;
}

const apiClient = apiClientManager.clientOrThrow();

try {
this.hasChanges = false;
await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions);
} catch (error) {
this.hasChanges = true;
throw error;
}
}

public startPeriodicFlush(intervalMs: number = 1000) {
const periodicFlush = async (intervalMs: number) => {
try {
await this.flush();
} catch (error) {
console.error("Failed to flush metadata", error);
throw error;
} finally {
scheduleNext();
}
};

const scheduleNext = () => {
this.flushTimeoutId = setTimeout(() => periodicFlush(intervalMs), intervalMs);
};

scheduleNext();
}

stopPeriodicFlush(): void {
if (this.flushTimeoutId) {
clearTimeout(this.flushTimeoutId);
this.flushTimeoutId = null;
}
}
}
Loading
Loading