Skip to content

Commit d35398d

Browse files
committed
Removed all completions/executions logic from the shared queue consumer
1 parent e2585c0 commit d35398d

File tree

1 file changed

+2
-61
lines changed

1 file changed

+2
-61
lines changed

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -617,16 +617,6 @@ export class SharedQueueConsumer {
617617
return;
618618
}
619619

620-
// if (messageBody.data.completedAttemptIds.length < 1) {
621-
// logger.error("No attempt IDs provided", {
622-
// queueMessage: message.data,
623-
// messageId: message.messageId,
624-
// });
625-
626-
// await this.#ackAndDoMoreWork(message.messageId);
627-
// return;
628-
// }
629-
630620
const resumableRun = await prisma.taskRun.findUnique({
631621
where: {
632622
id: message.messageId,
@@ -691,55 +681,6 @@ export class SharedQueueConsumer {
691681
return;
692682
}
693683

694-
const completions: TaskRunExecutionResult[] = [];
695-
const executions: TaskRunExecution[] = [];
696-
697-
for (const completedAttemptId of messageBody.data.completedAttemptIds) {
698-
const completedAttempt = await prisma.taskRunAttempt.findUnique({
699-
where: {
700-
id: completedAttemptId,
701-
taskRun: {
702-
lockedAt: {
703-
not: null,
704-
},
705-
lockedById: {
706-
not: null,
707-
},
708-
},
709-
},
710-
});
711-
712-
if (!completedAttempt) {
713-
logger.error("Completed attempt not found", {
714-
queueMessage: message.data,
715-
messageId: message.messageId,
716-
});
717-
718-
await this.#ackAndDoMoreWork(message.messageId);
719-
return;
720-
}
721-
722-
const completion = await this._tasks.getCompletionPayloadFromAttempt(completedAttempt.id);
723-
724-
if (!completion) {
725-
await this.#ackAndDoMoreWork(message.messageId);
726-
return;
727-
}
728-
729-
completions.push(completion);
730-
731-
const executionPayload = await this._tasks.getExecutionPayloadFromAttempt(
732-
completedAttempt.id
733-
);
734-
735-
if (!executionPayload) {
736-
await this.#ackAndDoMoreWork(message.messageId);
737-
return;
738-
}
739-
740-
executions.push(executionPayload.execution);
741-
}
742-
743684
try {
744685
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", {
745686
runId: resumableAttempt.taskRunId,
@@ -752,8 +693,8 @@ export class SharedQueueConsumer {
752693
runId: resumableAttempt.taskRunId,
753694
attemptId: resumableAttempt.id,
754695
attemptFriendlyId: resumableAttempt.friendlyId,
755-
completions,
756-
executions,
696+
completions: [],
697+
executions: [],
757698
});
758699
} catch (e) {
759700
if (e instanceof Error) {

0 commit comments

Comments
 (0)