Skip to content

Commit

Permalink
feat(job-queue-plugin): Improve pub/sub message handling (#2561)
Browse files Browse the repository at this point in the history
Catching invalid messages
Better logging about message handling
  • Loading branch information
mcfedr authored Dec 4, 2023
1 parent bfd9281 commit 3645819
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implement
}

const subscription = this.subscription(queueName);
const listener = (message: Message) => {
Logger.debug(`Received message: ${queueName}: ${message.id}`, loggerCtx);

const processMessage = async (message: Message) => {
Logger.verbose(`Received message: ${queueName}: ${message.id}`, loggerCtx);

const job = new Job<Data>({
id: message.id,
Expand All @@ -84,12 +85,21 @@ export class PubSubJobQueueStrategy extends InjectableJobQueueStrategy implement
createdAt: message.publishTime,
});

process(job)
await process(job);
};

const listener = (message: Message) => {
processMessage(message)
.then(() => {
message.ack();
Logger.verbose(`Finished handling: ${queueName}: ${message.id}`, loggerCtx);
})
.catch(err => {
message.nack();
Logger.error(
`Error handling: ${queueName}: ${message.id}: ${String(err.message)}`,
loggerCtx,
);
});
};
this.listeners.set(queueName, process, listener);
Expand Down

0 comments on commit 3645819

Please sign in to comment.