Skip to content

Commit ae0c373

Browse files
committed
Batch queue runs that are waiting for deploy
1 parent a50b93f commit ae0c373

File tree

2 files changed

+38
-42
lines changed

2 files changed

+38
-42
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,9 @@ const EnvironmentSchema = z.object({
442442
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
443443
LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
444444

445+
LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_SIZE: z.coerce.number().int().default(100),
446+
LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_STAGGER_MS: z.coerce.number().int().default(1_000),
447+
445448
COMMON_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
446449
COMMON_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
447450
COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),

apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { workerQueue } from "~/services/worker.server";
33
import { marqs } from "~/v3/marqs/index.server";
44
import { BaseService } from "./baseService.server";
55
import { logger } from "~/services/logger.server";
6+
import { env } from "~/env.server";
67

78
export class ExecuteTasksWaitingForDeployService extends BaseService {
89
public async call(backgroundWorkerId: string) {
@@ -17,7 +18,11 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
1718
organization: true,
1819
},
1920
},
20-
tasks: true,
21+
tasks: {
22+
select: {
23+
slug: true,
24+
},
25+
},
2126
},
2227
});
2328

@@ -26,6 +31,8 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
2631
return;
2732
}
2833

34+
const maxCount = env.LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_SIZE;
35+
2936
const runsWaitingForDeploy = await this._prisma.taskRun.findMany({
3037
where: {
3138
runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId,
@@ -36,8 +43,16 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
3643
},
3744
},
3845
orderBy: {
39-
number: "asc",
46+
createdAt: "asc",
4047
},
48+
select: {
49+
id: true,
50+
status: true,
51+
taskIdentifier: true,
52+
concurrencyKey: true,
53+
queue: true,
54+
},
55+
take: maxCount + 1,
4156
});
4257

4358
if (!runsWaitingForDeploy.length) {
@@ -63,50 +78,28 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
6378
});
6479
}
6580

66-
if (!marqs) {
67-
return;
68-
}
69-
70-
const enqueues: Promise<any>[] = [];
71-
let i = 0;
72-
7381
for (const run of runsWaitingForDeploy) {
74-
enqueues.push(
75-
marqs.enqueueMessage(
76-
backgroundWorker.runtimeEnvironment,
77-
run.queue,
78-
run.id,
79-
{
80-
type: "EXECUTE",
81-
taskIdentifier: run.taskIdentifier,
82-
projectId: backgroundWorker.runtimeEnvironment.projectId,
83-
environmentId: backgroundWorker.runtimeEnvironment.id,
84-
environmentType: backgroundWorker.runtimeEnvironment.type,
85-
},
86-
run.concurrencyKey ?? undefined,
87-
Date.now() + i * 5 // slight delay to help preserve order
88-
)
82+
await marqs?.enqueueMessage(
83+
backgroundWorker.runtimeEnvironment,
84+
run.queue,
85+
run.id,
86+
{
87+
type: "EXECUTE",
88+
taskIdentifier: run.taskIdentifier,
89+
projectId: backgroundWorker.runtimeEnvironment.projectId,
90+
environmentId: backgroundWorker.runtimeEnvironment.id,
91+
environmentType: backgroundWorker.runtimeEnvironment.type,
92+
},
93+
run.concurrencyKey ?? undefined
8994
);
90-
91-
i++;
9295
}
9396

94-
const settled = await Promise.allSettled(enqueues);
95-
96-
if (settled.some((s) => s.status === "rejected")) {
97-
const rejectedRuns: { id: string; reason: any }[] = [];
98-
99-
runsWaitingForDeploy.forEach((run, i) => {
100-
if (settled[i].status === "rejected") {
101-
const rejected = settled[i] as PromiseRejectedResult;
102-
103-
rejectedRuns.push({ id: run.id, reason: rejected.reason });
104-
}
105-
});
106-
107-
logger.error("Failed to requeue task runs for immediate execution", {
108-
rejectedRuns,
109-
});
97+
if (runsWaitingForDeploy.length > maxCount) {
98+
await ExecuteTasksWaitingForDeployService.enqueue(
99+
backgroundWorkerId,
100+
this._prisma,
101+
new Date(Date.now() + env.LEGACY_RUN_ENGINE_WAITING_FOR_DEPLOY_BATCH_STAGGER_MS)
102+
);
110103
}
111104
}
112105

0 commit comments

Comments
 (0)