Skip to content
This repository was archived by the owner on Sep 16, 2024. It is now read-only.

Allow DelaySeconds to be specified in SubmitTaskInput #7

Merged
merged 1 commit into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/__tests__/noop-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ describe('NoopClient', () => {
})
await expect(response).rejects.toThrowError('Payload validation failed')
})

it('throws a TypeError if a delaySeconds of >900 is provided', async () => {
const response = client.submitTask({
...exampleTaskRequest,
delaySeconds: 901
})
await expect(response).rejects.toThrowError(/DelaySeconds too large/)
})
})

describe('submitAllTasks', () => {
Expand All @@ -71,6 +79,7 @@ describe('NoopClient', () => {
expect(results[0].status).toEqual(BatchSubmitTaskStatus.SUCCESSFUL)
expect(results[0].error).toEqual(undefined)
})

it('throws the exception if validation of any task fails', async () => {
const invalidTaskRequest = {
...exampleTaskRequest,
Expand All @@ -83,6 +92,16 @@ describe('NoopClient', () => {

await expect(promise).rejects.toThrowError()
})

it('throws a TypeError if a delaySeconds of >900 is provided', async () => {
const response = client.submitAllTasks([
{
...exampleTaskRequest,
delaySeconds: 901
}
])
await expect(response).rejects.toThrowError(/DelaySeconds too large/)
})
})

describe('generateConsumers', () => {
Expand Down
37 changes: 37 additions & 0 deletions src/__tests__/sqs-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@ describe('AsyncTasksClient', () => {
)
})

it('allows a delaySeconds to be provided', async () => {
const exampleWithDelay = {
...exampleTaskRequest,
delaySeconds: 30
}
await client.submitTask(exampleWithDelay)
expect(sendSpy).toBeCalledWith(
expect.objectContaining({
DelaySeconds: 30
})
)
})

it('throws an OperationNotRegistered error if an invalid operationName is specified', async () => {
const response = client.submitTask({ ...exampleTaskRequest, operationName: 'does not exist' })
await expect(response).rejects.toThrowError('No handler registered for operation')
Expand Down Expand Up @@ -143,6 +156,12 @@ describe('AsyncTasksClient', () => {
}
}

const delayedRequest = {
operationName: existingOperation.operationName,
payload: examplePayload,
delaySeconds: 30
}

it('calls sendMessageBatch once for each assigned queue', async () => {
const { results } = await client.submitAllTasks([exampleTaskRequest, secondTaskRequest])

Expand All @@ -155,6 +174,24 @@ describe('AsyncTasksClient', () => {
expect(sendSpy).not.toBeCalled()
})

it('passes down delaySeconds for requests', async () => {
await client.submitAllTasks([exampleTaskRequest, delayedRequest])

expect(sendBatchSpy).toBeCalledTimes(1)
const input: AWS.SQS.SendMessageBatchRequest = sendBatchSpy.mock.calls[0][0]

expect(input.Entries[0]).toEqual(
expect.objectContaining({
DelaySeconds: undefined
})
)
expect(input.Entries[1]).toEqual(
expect.objectContaining({
DelaySeconds: delayedRequest.delaySeconds
})
)
})

it('does not call sendMessageBatch if validation of any task fails', async () => {
const invalidTaskRequest = {
...exampleTaskRequest,
Expand Down
9 changes: 9 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ export interface GetConsumersInput<TContext = DefaultTaskContext> {
export interface SubmitTaskInput<T> {
operationName: string
payload: T

/**
* Number of seconds to wait before making the message visible. Defaults to 0
*
* Max allowed value by SQS is 900 (15 minutes)
*
* More Info: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
*/
delaySeconds?: number
}

export interface SubmitTaskResponse {
Expand Down
7 changes: 7 additions & 0 deletions src/noop-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { OperationConfiguration, OperationName, OperationRouter, QueueName } fro
import { DefaultTaskContext } from './context'
import { InvalidPayloadError, OperationNotRegistered } from './errors'

const MAX_SQS_VISIBILITY_DELAY_SECS = 900

export class NoopClient<TContext = DefaultTaskContext> implements TaskClient<TContext> {
private _routes: OperationRouter

Expand Down Expand Up @@ -68,6 +70,11 @@ export class NoopClient<TContext = DefaultTaskContext> implements TaskClient<TCo
throw new InvalidPayloadError(operationName, payload, error)
}

// Basic sanity test to help developers out if they are using the NoopClient in development
if (input.delaySeconds && input.delaySeconds > MAX_SQS_VISIBILITY_DELAY_SECS) {
throw new TypeError('DelaySeconds too large. See SQS SendMessage documentation')
}

return 'not-a-valid-task-id'
}
}
20 changes: 11 additions & 9 deletions src/sqs-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ export class AsyncTasksClient<TContext = DefaultTaskContext> implements TaskClie
*/
public async submitTask<T>(input: SubmitTaskInput<T>): Promise<SubmitTaskResponse> {
const { queueName, task } = await this._routeToTask<T>(input)

const queue = this.queues[queueName]
const sqsResponse = await this.sqsClient
.sendMessage({
QueueUrl: queue.queueUrl,
MessageBody: JSON.stringify(task)
})
.promise()

const message: SQS.Types.SendMessageRequest = {
QueueUrl: queue.queueUrl,
MessageBody: JSON.stringify(task),
DelaySeconds: input.delaySeconds
}

const sqsResponse = await this.sqsClient.sendMessage(message).promise()

return {
messageId: sqsResponse.MessageId || null,
Expand All @@ -139,14 +140,15 @@ export class AsyncTasksClient<TContext = DefaultTaskContext> implements TaskClie

// Group tasks as messages assigned to their target queue
const messagesByQueue: Record<QueueName, SQS.SendMessageBatchRequestEntryList> = {}
routableTasks.forEach((routableTask): void => {
routableTasks.forEach((routableTask, i): void => {
const { queueName, task } = routableTask
if (!messagesByQueue[queueName]) {
messagesByQueue[queueName] = []
}
messagesByQueue[queueName].push({
Id: task.taskId,
MessageBody: JSON.stringify(task)
MessageBody: JSON.stringify(task),
DelaySeconds: input[i].delaySeconds
})
})

Expand Down