Skip to content

Retry heartbeat timeouts by putting back in the queue #1689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 10, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Try and nack, if it fails then fail the run
  • Loading branch information
matt-aitken committed Feb 10, 2025
commit bec58eb034e9f2bbda0be93d35f062573513a865
130 changes: 79 additions & 51 deletions apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,85 +39,113 @@ export class TaskRunHeartbeatFailedService extends BaseService {
});

if (!taskRun) {
logger.error("[RequeueTaskRunService] Task run not found", {
logger.error("[TaskRunHeartbeatFailedService] Task run not found", {
runId,
});

return;
}

const service = new FailedTaskRunService();

switch (taskRun.status) {
case "PENDING": {
if (taskRun.lockedAt) {
if (taskRun._count.attempts === 0) {
//no attempts, so we can requeue
logger.debug("[RequeueTaskRunService] Requeueing task run, there were no attempts.", {
const backInQueue = await marqs?.nackMessage(taskRun.id);

if (backInQueue) {
logger.debug(
`[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`,
{
taskRun,
});

await marqs?.nackMessage(taskRun.id);
} else {
logger.debug(
"[RequeueTaskRunService] Failing task run because the heartbeat failed, it's PENDING, locked, and has attempts",
{ taskRun }
);

const service = new FailedTaskRunService();

await service.call(taskRun.friendlyId, {
ok: false,
id: taskRun.friendlyId,
retry: undefined,
error: {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
message: "Did not receive a heartbeat from the worker in time",
},
});
}
}
);
} else {
logger.debug("[RequeueTaskRunService] Nacking task run", { taskRun });

await marqs?.nackMessage(taskRun.id);
logger.debug(
`[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`,
{ taskRun }
);
await service.call(taskRun.friendlyId, {
ok: false,
id: taskRun.friendlyId,
retry: undefined,
error: {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
message: "Did not receive a heartbeat from the worker in time",
},
});
}

break;
}
case "EXECUTING":
case "RETRYING_AFTER_FAILURE": {
logger.debug("[RequeueTaskRunService] Failing task run", { taskRun });

const service = new FailedTaskRunService();

await service.call(taskRun.friendlyId, {
ok: false,
id: taskRun.friendlyId,
retry: undefined,
error: {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
message: "Did not receive a heartbeat from the worker in time",
},
});
const backInQueue = await marqs?.nackMessage(taskRun.id);

if (backInQueue) {
logger.debug(
`[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`,
{
taskRun,
}
);
} else {
logger.debug(
`[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`,
{ taskRun }
);
await service.call(taskRun.friendlyId, {
ok: false,
id: taskRun.friendlyId,
retry: undefined,
error: {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
message: "Did not receive a heartbeat from the worker in time",
},
});
}

break;
}
case "DELAYED":
case "WAITING_FOR_DEPLOY": {
logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun });
logger.debug("[TaskRunHeartbeatFailedService] Removing task run from queue", { taskRun });

await marqs?.acknowledgeMessage(
taskRun.id,
"Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in RequeueTaskRunService"
"Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in TaskRunHeartbeatFailedService"
);

break;
}
case "WAITING_TO_RESUME":
case "PAUSED": {
logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun });
const backInQueue = await marqs?.nackMessage(taskRun.id);

await marqs?.nackMessage(taskRun.id);
if (backInQueue) {
logger.debug(
`[TaskRunHeartbeatFailedService] ${taskRun.status} run is back in the queue run`,
{
taskRun,
}
);
} else {
logger.debug(
`[TaskRunHeartbeatFailedService] ${taskRun.status} run not back in the queue, failing`,
{ taskRun }
);
await service.call(taskRun.friendlyId, {
ok: false,
id: taskRun.friendlyId,
retry: undefined,
error: {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
message: "Did not receive a heartbeat from the worker in time",
},
});
}

break;
}
Expand All @@ -129,11 +157,11 @@ export class TaskRunHeartbeatFailedService extends BaseService {
case "EXPIRED":
case "TIMED_OUT":
case "CANCELED": {
logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun });
logger.debug("[TaskRunHeartbeatFailedService] Task run is completed", { taskRun });

await marqs?.acknowledgeMessage(
taskRun.id,
"Task run is already completed in RequeueTaskRunService"
"Task run is already completed in TaskRunHeartbeatFailedService"
);

try {
Expand All @@ -149,7 +177,7 @@ export class TaskRunHeartbeatFailedService extends BaseService {
delayInMs: taskRun.lockedToVersion?.supportsLazyAttempts ? 5_000 : undefined,
});
} catch (error) {
logger.error("[RequeueTaskRunService] Error signaling run cancellation", {
logger.error("[TaskRunHeartbeatFailedService] Error signaling run cancellation", {
runId: taskRun.id,
error: error instanceof Error ? error.message : error,
});
Expand Down