Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Commit a5f8e4a

Browse files
author
Bruce Felt
authored
Allow DelaySeconds to be specified in SubmitTaskInput (#7)
SQS does provide the ability to delay visiblity of a mesage by up to 15 minutes[1]. This change exposes this capability with an optional delaySeconds argument in payloads for both submitTask and submitAllTasks. * add delaySeconds?: number to SubmitTaskInput * map delaySeconds to DelaySeconds in calls in SQS Client * provide a sanity check for delaySeconds in NoopClient [1] https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
1 parent 8f7b58e commit a5f8e4a

File tree

5 files changed

+83
-9
lines changed

5 files changed

+83
-9
lines changed

src/__tests__/noop-client.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ describe('NoopClient', () => {
5555
})
5656
await expect(response).rejects.toThrowError('Payload validation failed')
5757
})
58+
59+
it('throws a TypeError if a delaySeconds of >900 is provided', async () => {
60+
const response = client.submitTask({
61+
...exampleTaskRequest,
62+
delaySeconds: 901
63+
})
64+
await expect(response).rejects.toThrowError(/DelaySeconds too large/)
65+
})
5866
})
5967

6068
describe('submitAllTasks', () => {
@@ -71,6 +79,7 @@ describe('NoopClient', () => {
7179
expect(results[0].status).toEqual(BatchSubmitTaskStatus.SUCCESSFUL)
7280
expect(results[0].error).toEqual(undefined)
7381
})
82+
7483
it('throws the exception if validation of any task fails', async () => {
7584
const invalidTaskRequest = {
7685
...exampleTaskRequest,
@@ -83,6 +92,16 @@ describe('NoopClient', () => {
8392

8493
await expect(promise).rejects.toThrowError()
8594
})
95+
96+
it('throws a TypeError if a delaySeconds of >900 is provided', async () => {
97+
const response = client.submitAllTasks([
98+
{
99+
...exampleTaskRequest,
100+
delaySeconds: 901
101+
}
102+
])
103+
await expect(response).rejects.toThrowError(/DelaySeconds too large/)
104+
})
86105
})
87106

88107
describe('generateConsumers', () => {

src/__tests__/sqs-client.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,19 @@ describe('AsyncTasksClient', () => {
116116
)
117117
})
118118

119+
it('allows a delaySeconds to be provided', async () => {
120+
const exampleWithDelay = {
121+
...exampleTaskRequest,
122+
delaySeconds: 30
123+
}
124+
await client.submitTask(exampleWithDelay)
125+
expect(sendSpy).toBeCalledWith(
126+
expect.objectContaining({
127+
DelaySeconds: 30
128+
})
129+
)
130+
})
131+
119132
it('throws an OperationNotRegistered error if an invalid operationName is specified', async () => {
120133
const response = client.submitTask({ ...exampleTaskRequest, operationName: 'does not exist' })
121134
await expect(response).rejects.toThrowError('No handler registered for operation')
@@ -143,6 +156,12 @@ describe('AsyncTasksClient', () => {
143156
}
144157
}
145158

159+
const delayedRequest = {
160+
operationName: existingOperation.operationName,
161+
payload: examplePayload,
162+
delaySeconds: 30
163+
}
164+
146165
it('calls sendMessageBatch once for each assigned queue', async () => {
147166
const { results } = await client.submitAllTasks([exampleTaskRequest, secondTaskRequest])
148167

@@ -155,6 +174,24 @@ describe('AsyncTasksClient', () => {
155174
expect(sendSpy).not.toBeCalled()
156175
})
157176

177+
it('passes down delaySeconds for requests', async () => {
178+
await client.submitAllTasks([exampleTaskRequest, delayedRequest])
179+
180+
expect(sendBatchSpy).toBeCalledTimes(1)
181+
const input: AWS.SQS.SendMessageBatchRequest = sendBatchSpy.mock.calls[0][0]
182+
183+
expect(input.Entries[0]).toEqual(
184+
expect.objectContaining({
185+
DelaySeconds: undefined
186+
})
187+
)
188+
expect(input.Entries[1]).toEqual(
189+
expect.objectContaining({
190+
DelaySeconds: delayedRequest.delaySeconds
191+
})
192+
)
193+
})
194+
158195
it('does not call sendMessageBatch if validation of any task fails', async () => {
159196
const invalidTaskRequest = {
160197
...exampleTaskRequest,

src/client.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@ export interface GetConsumersInput<TContext = DefaultTaskContext> {
1010
export interface SubmitTaskInput<T> {
1111
operationName: string
1212
payload: T
13+
14+
/**
15+
* Number of seconds to wait before making the message visible. Defaults to 0
16+
*
17+
* Max allowed value by SQS is 900 (15 minutes)
18+
*
19+
* More Info: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
20+
*/
21+
delaySeconds?: number
1322
}
1423

1524
export interface SubmitTaskResponse {

src/noop-client.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import { OperationConfiguration, OperationName, OperationRouter, QueueName } fro
1111
import { DefaultTaskContext } from './context'
1212
import { InvalidPayloadError, OperationNotRegistered } from './errors'
1313

14+
const MAX_SQS_VISIBILITY_DELAY_SECS = 900
15+
1416
export class NoopClient<TContext = DefaultTaskContext> implements TaskClient<TContext> {
1517
private _routes: OperationRouter
1618

@@ -68,6 +70,11 @@ export class NoopClient<TContext = DefaultTaskContext> implements TaskClient<TCo
6870
throw new InvalidPayloadError(operationName, payload, error)
6971
}
7072

73+
// Basic sanity test to help developers out if they are using the NoopClient in development
74+
if (input.delaySeconds && input.delaySeconds > MAX_SQS_VISIBILITY_DELAY_SECS) {
75+
throw new TypeError('DelaySeconds too large. See SQS SendMessage documentation')
76+
}
77+
7178
return 'not-a-valid-task-id'
7279
}
7380
}

src/sqs-client.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,15 @@ export class AsyncTasksClient<TContext = DefaultTaskContext> implements TaskClie
114114
*/
115115
public async submitTask<T>(input: SubmitTaskInput<T>): Promise<SubmitTaskResponse> {
116116
const { queueName, task } = await this._routeToTask<T>(input)
117-
118117
const queue = this.queues[queueName]
119-
const sqsResponse = await this.sqsClient
120-
.sendMessage({
121-
QueueUrl: queue.queueUrl,
122-
MessageBody: JSON.stringify(task)
123-
})
124-
.promise()
118+
119+
const message: SQS.Types.SendMessageRequest = {
120+
QueueUrl: queue.queueUrl,
121+
MessageBody: JSON.stringify(task),
122+
DelaySeconds: input.delaySeconds
123+
}
124+
125+
const sqsResponse = await this.sqsClient.sendMessage(message).promise()
125126

126127
return {
127128
messageId: sqsResponse.MessageId || null,
@@ -139,14 +140,15 @@ export class AsyncTasksClient<TContext = DefaultTaskContext> implements TaskClie
139140

140141
// Group tasks as messages assigned to their target queue
141142
const messagesByQueue: Record<QueueName, SQS.SendMessageBatchRequestEntryList> = {}
142-
routableTasks.forEach((routableTask): void => {
143+
routableTasks.forEach((routableTask, i): void => {
143144
const { queueName, task } = routableTask
144145
if (!messagesByQueue[queueName]) {
145146
messagesByQueue[queueName] = []
146147
}
147148
messagesByQueue[queueName].push({
148149
Id: task.taskId,
149-
MessageBody: JSON.stringify(task)
150+
MessageBody: JSON.stringify(task),
151+
DelaySeconds: input[i].delaySeconds
150152
})
151153
})
152154

0 commit comments

Comments
 (0)