Skip to content

Commit

Permalink
fix: message visibility timeout (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
aleortega authored Jul 1, 2024
1 parent acc1ecf commit 9f1b9d9
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 13 deletions.
2 changes: 1 addition & 1 deletion consumer-server/src/adapters/memory-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ export function createMemoryQueueAdapter(): QueueComponent {
queue.delete(receiptHandle)
}

return { send, receiveSingleMessage, deleteMessage }
return { send, receiveSingleMessage, deleteMessage, increaseMessageVisibility: async () => {} }
}
14 changes: 13 additions & 1 deletion consumer-server/src/adapters/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
ChangeMessageVisibilityCommand,
DeleteMessageCommand,
Message,
ReceiveMessageCommand,
Expand Down Expand Up @@ -49,9 +50,20 @@ export async function createSqsAdapter(endpoint: string): Promise<QueueComponent
await client.send(deleteCommand)
}

async function increaseMessageVisibility(receiptHandle: string): Promise<void> {
await client.send(
new ChangeMessageVisibilityCommand({
QueueUrl: endpoint,
ReceiptHandle: receiptHandle,
VisibilityTimeout: 120
})
)
}

return {
send,
receiveSingleMessage,
deleteMessage
deleteMessage,
increaseMessageVisibility
}
}
27 changes: 18 additions & 9 deletions consumer-server/src/logic/message-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ export async function createMessageProcesorComponent({
const logger = logs.getLogger('message-procesor')
const abServers = (await config.requireString('AB_SERVERS')).split(';')

function isRelatedToVisibilityTimeout(errorMessage: string | undefined): boolean {
const visibilityTimeoutError = 'The receipt handle has expired'
return !!errorMessage && errorMessage.includes(visibilityTimeoutError)
}

function isRelatedToAssetBundlePublish(errorMessage: string | undefined): boolean {
return !!errorMessage && abServers.some((abServer) => errorMessage.includes(abServer))
}
Expand Down Expand Up @@ -159,18 +164,22 @@ export async function createMessageProcesorComponent({
error: error.message
})

if (isRelatedToAssetBundlePublish(error?.message)) {
await reQueue(message)
if(isRelatedToVisibilityTimeout(error?.message)) {
await queue.increaseMessageVisibility(receiptMessageHandle)
} else {
if (retry < 3) {
if (isRelatedToAssetBundlePublish(error?.message)) {
await reQueue(message)
} else {
logger.warn('Max attempts reached, message will not be retried', {
entityId: message.entity.entityId,
base: message.entity.metadata.scene.base,
attempt: retry
})
metrics.increment('lod_generation_count', { status: 'failed' }, 1)
if (retry < 3) {
await reQueue(message)
} else {
logger.warn('Max attempts reached, message will not be retried', {
entityId: message.entity.entityId,
base: message.entity.metadata.scene.base,
attempt: retry
})
metrics.increment('lod_generation_count', { status: 'failed' }, 1)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions consumer-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export type QueueComponent = {
send(message: QueueMessage): Promise<void>
receiveSingleMessage(): Promise<Message[]>
deleteMessage(receiptHandle: string): Promise<void>
increaseMessageVisibility(receiptHandle: string): Promise<void>
}

export type QueueWorker = IBaseComponent
Expand Down
3 changes: 2 additions & 1 deletion consumer-server/test/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ async function initComponents(): Promise<TestComponents> {
const queue: QueueComponent = {
deleteMessage: jest.fn(),
receiveSingleMessage: jest.fn(),
send: jest.fn()
send: jest.fn(),
increaseMessageVisibility: jest.fn()
}
const messageConsumer: QueueWorker = {
start: jest.fn(),
Expand Down
3 changes: 2 additions & 1 deletion consumer-server/test/unit/message-processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ function getMessageProcessorMockComponents() {
queue: {
send: jest.fn(),
receiveSingleMessage: jest.fn(),
deleteMessage: jest.fn()
deleteMessage: jest.fn(),
increaseMessageVisibility: jest.fn()
},
lodGenerator: {
generate: jest.fn()
Expand Down

0 comments on commit 9f1b9d9

Please sign in to comment.