Skip to content

Fix restored runner ID and various small enhancements #1880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 4, 2025
2 changes: 2 additions & 0 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const Env = z.object({
// Dequeue settings (provider mode)
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),

// Optional services
TRIGGER_WARM_START_URL: z.string().optional(),
Expand All @@ -50,6 +51,7 @@ const Env = z.object({
// Kubernetes specific settings
KUBERNETES_FORCE_ENABLED: BoolEnv.default(false),
KUBERNETES_NAMESPACE: z.string().default("default"),
KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"),
EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),

Expand Down
17 changes: 11 additions & 6 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ class ManagedSupervisor {
this.logger.warn("[ManagedWorker] Failed pod handler disabled");
}

this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), "");
this.resourceMonitor = new KubernetesResourceMonitor(
createK8sApi(),
env.TRIGGER_WORKER_INSTANCE_NAME
);
this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions);
} else {
this.resourceMonitor = new DockerResourceMonitor(new Docker());
Expand All @@ -113,10 +116,11 @@ class ManagedSupervisor {
managedWorkerSecret: env.MANAGED_WORKER_SECRET,
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
preDequeue: async () => {
if (this.isKubernetes) {
// TODO: Test k8s resource monitor and remove this
// Not used in k8s for now
return {};
}

Expand Down Expand Up @@ -234,10 +238,11 @@ class ManagedSupervisor {
snapshotFriendlyId: message.snapshot.friendlyId,
});

this.resourceMonitor.blockResources({
cpu: message.run.machine.cpu,
memory: message.run.machine.memory,
});
// Disabled for now
// this.resourceMonitor.blockResources({
// cpu: message.run.machine.cpu,
// memory: message.run.machine.memory,
// });
} catch (error) {
this.logger.error("[ManagedWorker] Failed to create workload", { error });
}
Expand Down
4 changes: 4 additions & 0 deletions apps/supervisor/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ export function getDockerHostDomain() {

return isMacOs || isWindows ? "host.docker.internal" : "localhost";
}

export function getRunnerId(runId: string) {
return `runner-${runId.replace("run_", "")}`;
}
6 changes: 3 additions & 3 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
import {
type WorkloadManager,
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import { x } from "tinyexec";
import { env } from "../env.js";
import { getDockerHostDomain } from "../util.js";
import { getDockerHostDomain, getRunnerId } from "../util.js";

export class DockerWorkloadManager implements WorkloadManager {
private readonly logger = new SimpleStructuredLogger("docker-workload-provider");
Expand All @@ -23,7 +22,8 @@ export class DockerWorkloadManager implements WorkloadManager {
async create(opts: WorkloadManagerCreateOptions) {
this.logger.log("[DockerWorkloadProvider] Creating container", { opts });

const runnerId = RunnerId.generate();
const runnerId = getRunnerId(opts.runFriendlyId);

const runArgs = [
"run",
"--detach",
Expand Down
6 changes: 3 additions & 3 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import {
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
import { env } from "../env.js";
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
import { getRunnerId } from "../util.js";

type ResourceQuantities = {
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
Expand All @@ -31,7 +31,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
async create(opts: WorkloadManagerCreateOptions) {
this.logger.log("[KubernetesWorkloadManager] Creating container", { opts });

const runnerId = RunnerId.generate().replace(/_/g, "-");
const runnerId = getRunnerId(opts.runFriendlyId);

try {
await this.k8s.core.createNamespacedPod({
Expand Down Expand Up @@ -217,7 +217,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
automountServiceAccountToken: false,
imagePullSecrets: this.getImagePullSecrets(),
nodeSelector: {
nodetype: "worker-re2",
nodetype: env.KUBERNETES_WORKER_NODETYPE_LABEL,
},
};
}
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { json, TypedResponse } from "@remix-run/server-runtime";
import { WorkerApiDequeueRequestBody, WorkerApiDequeueResponseBody } from "@trigger.dev/core/v3/workers";
import {
WorkerApiDequeueRequestBody,
WorkerApiDequeueResponseBody,
} from "@trigger.dev/core/v3/workers";
import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";

export const action = createActionWorkerApiRoute(
Expand All @@ -10,6 +13,7 @@ export const action = createActionWorkerApiRoute(
return json(
await authenticatedWorker.dequeue({
maxResources: body.maxResources,
maxRunCount: body.maxRunCount,
})
);
}
Expand Down
Loading