Skip to content

feat(batch): add async processor #1616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tests: improved unit tests
  • Loading branch information
dreamorosi committed Jul 19, 2023
commit 3929c40c110ffe186e98cb52dd68b01bb520cf9c
2 changes: 1 addition & 1 deletion packages/batch/src/AsyncBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class AsyncBatchProcessor extends BasePartialBatchProcessor {
* @returns response of success or failure
*/
public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse {
throw new Error('Not implemented. Use processAsync() instead.');
throw new Error('Not implemented. Use asyncProcess() instead.');
}
}

Expand Down
50 changes: 25 additions & 25 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,31 @@ abstract class BasePartialProcessor {
this.handler = new Function();
}

/**
* Call instance's handler for each record
* @returns List of processed records
*/
public async asyncProcess(): Promise<(SuccessResponse | FailureResponse)[]> {
/**
* If this is an sync processor, user should have called process instead,
* so we call the method early to throw the error early thus failing fast.
*/
if (this.constructor.name === 'BatchProcessor') {
await this.asyncProcessRecord(this.records[0]);
}
this.prepare();

const processingPromises: Promise<SuccessResponse | FailureResponse>[] =
this.records.map((record) => this.asyncProcessRecord(record));

const processedRecords: (SuccessResponse | FailureResponse)[] =
await Promise.all(processingPromises);

this.clean();

return processedRecords;
}

/**
* Process a record with an asyncronous handler
*
Expand Down Expand Up @@ -94,31 +119,6 @@ abstract class BasePartialProcessor {
return processedRecords;
}

/**
* Call instance's handler for each record
* @returns List of processed records
*/
public async processAsync(): Promise<(SuccessResponse | FailureResponse)[]> {
/**
* If this is an sync processor, user should have called process instead,
* so we call the method early to throw the error early thus failing fast.
*/
if (this.constructor.name === 'BatchProcessor') {
await this.asyncProcessRecord(this.records[0]);
}
this.prepare();

const processingPromises: Promise<SuccessResponse | FailureResponse>[] =
this.records.map((record) => this.asyncProcessRecord(record));

const processedRecords: (SuccessResponse | FailureResponse)[] =
await Promise.all(processingPromises);

this.clean();

return processedRecords;
}

/**
* Process a record with the handler
* @param record Record to be processed
Expand Down
2 changes: 1 addition & 1 deletion packages/batch/src/asyncProcessPartialResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const asyncProcessPartialResponse = async (

processor.register(event.Records, recordHandler, options);

await processor.processAsync();
await processor.asyncProcess();

return processor.response();
};
Expand Down
18 changes: 18 additions & 0 deletions packages/batch/tests/helpers/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,23 @@ const handlerWithContext = (
return record.body;
};

const asyncHandlerWithContext = async (
record: SQSRecord,
options: BatchProcessingOptions
): Promise<string> => {
const context = options.context;

try {
if (context.getRemainingTimeInMillis() == 0) {
throw Error('No time remaining.');
}
} catch (e) {
throw Error('Context possibly malformed. Displaying context:\n' + context);
}

return record.body;
};

export {
sqsRecordHandler,
asyncSqsRecordHandler,
Expand All @@ -88,4 +105,5 @@ export {
dynamodbRecordHandler,
asyncDynamodbRecordHandler,
handlerWithContext,
asyncHandlerWithContext,
};
85 changes: 72 additions & 13 deletions packages/batch/tests/unit/AsyncBatchProcessor.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
/**
* Test AsyncAsyncBatchProcessor class
* Test AsyncBatchProcessor class
*
* @group unit/batch/class/asyncAsyncBatchProcessor
* @group unit/batch/class/asyncBatchProcessor
*/
import type { Context } from 'aws-lambda';
import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts';
import { AsyncBatchProcessor } from '../../src/AsyncBatchProcessor';
import { EventType } from '../../src/constants';
import { BatchProcessingError } from '../../src/errors';
import type { BatchProcessingOptions } from '../../src/types';
import {
dynamodbRecordFactory,
kinesisRecordFactory,
Expand All @@ -15,10 +18,12 @@ import {
asyncDynamodbRecordHandler,
asyncKinesisRecordHandler,
asyncSqsRecordHandler,
asyncHandlerWithContext,
} from '../helpers/handlers';

describe('Class: AsyncAsyncBatchProcessor', () => {
describe('Class: AsyncBatchProcessor', () => {
const ENVIRONMENT_VARIABLES = process.env;
const options: BatchProcessingOptions = { context: dummyContext };

beforeEach(() => {
jest.clearAllMocks();
Expand All @@ -40,7 +45,7 @@ describe('Class: AsyncAsyncBatchProcessor', () => {

// Act
processor.register(records, asyncSqsRecordHandler);
const processedMessages = await processor.processAsync();
const processedMessages = await processor.asyncProcess();

// Assess
expect(processedMessages).toStrictEqual([
Expand All @@ -59,7 +64,7 @@ describe('Class: AsyncAsyncBatchProcessor', () => {

// Act
processor.register(records, asyncSqsRecordHandler);
const processedMessages = await processor.processAsync();
const processedMessages = await processor.asyncProcess();

// Assess
expect(processedMessages[1]).toStrictEqual([
Expand Down Expand Up @@ -89,7 +94,7 @@ describe('Class: AsyncAsyncBatchProcessor', () => {
processor.register(records, asyncSqsRecordHandler);

// Assess
await expect(processor.processAsync()).rejects.toThrowError(
await expect(processor.asyncProcess()).rejects.toThrowError(
BatchProcessingError
);
});
Expand All @@ -105,7 +110,7 @@ describe('Class: AsyncAsyncBatchProcessor', () => {

// Act
processor.register(records, asyncKinesisRecordHandler);
const processedMessages = await processor.processAsync();
const processedMessages = await processor.asyncProcess();

// Assess
expect(processedMessages).toStrictEqual([
Expand All @@ -124,7 +129,7 @@ describe('Class: AsyncAsyncBatchProcessor', () => {

// Act
processor.register(records, asyncKinesisRecordHandler);
const processedMessages = await processor.processAsync();
const processedMessages = await processor.asyncProcess();

// Assess
expect(processedMessages[1]).toStrictEqual([
Expand Down Expand Up @@ -154,7 +159,7 @@ describe('Class: AsyncAsyncBatchProcessor', () => {
processor.register(records, asyncKinesisRecordHandler);

// Assess
await expect(processor.processAsync()).rejects.toThrowError(
await expect(processor.asyncProcess()).rejects.toThrowError(
BatchProcessingError
);
});
Expand All @@ -170,7 +175,7 @@ describe('Class: AsyncAsyncBatchProcessor', () => {

// Act
processor.register(records, asyncDynamodbRecordHandler);
const processedMessages = await processor.processAsync();
const processedMessages = await processor.asyncProcess();

// Assess
expect(processedMessages).toStrictEqual([
Expand All @@ -189,7 +194,7 @@ describe('Class: AsyncAsyncBatchProcessor', () => {

// Act
processor.register(records, asyncDynamodbRecordHandler);
const processedMessages = await processor.processAsync();
const processedMessages = await processor.asyncProcess();

// Assess
expect(processedMessages[1]).toStrictEqual([
Expand Down Expand Up @@ -219,7 +224,61 @@ describe('Class: AsyncAsyncBatchProcessor', () => {
processor.register(records, asyncDynamodbRecordHandler);

// Assess
await expect(processor.processAsync()).rejects.toThrowError(
await expect(processor.asyncProcess()).rejects.toThrowError(
BatchProcessingError
);
});
});

describe('Batch processing with Lambda context', () => {
test('Batch processing when context is provided and handler accepts', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const records = [firstRecord, secondRecord];
const processor = new AsyncBatchProcessor(EventType.SQS);

// Act
processor.register(records, asyncHandlerWithContext, options);
const processedMessages = await processor.asyncProcess();

// Assess
expect(processedMessages).toStrictEqual([
['success', firstRecord.body, firstRecord],
['success', secondRecord.body, secondRecord],
]);
});

test('Batch processing when context is provided and handler does not accept', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const records = [firstRecord, secondRecord];
const processor = new AsyncBatchProcessor(EventType.SQS);

// Act
processor.register(records, asyncSqsRecordHandler, options);
const processedMessages = await processor.asyncProcess();

// Assess
expect(processedMessages).toStrictEqual([
['success', firstRecord.body, firstRecord],
['success', secondRecord.body, secondRecord],
]);
});

test('Batch processing when malformed context is provided and handler attempts to use', async () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const records = [firstRecord, secondRecord];
const processor = new AsyncBatchProcessor(EventType.SQS);
const badContext = { foo: 'bar' };
const badOptions = { context: badContext as unknown as Context };

// Act
processor.register(records, asyncHandlerWithContext, badOptions);
await expect(() => processor.asyncProcess()).rejects.toThrowError(
BatchProcessingError
);
});
Expand All @@ -231,7 +290,7 @@ describe('Class: AsyncAsyncBatchProcessor', () => {

// Act & Assess
expect(() => processor.process()).toThrowError(
'Not implemented. Use processAsync() instead.'
'Not implemented. Use asyncProcess() instead.'
);
});
});
2 changes: 1 addition & 1 deletion packages/batch/tests/unit/BatchProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ describe('Class: BatchProcessor', () => {
const processor = new BatchProcessor(EventType.SQS);

// Act & Assess
await expect(() => processor.processAsync()).rejects.toThrow(
await expect(() => processor.asyncProcess()).rejects.toThrow(
'Not implemented. Use process() instead.'
);
});
Expand Down
37 changes: 0 additions & 37 deletions packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,41 +56,4 @@ describe('Class: SqsFifoBatchProcessor', () => {
);
});
});

describe('hronous SQS FIFO batch processing', () => {
test('SQS FIFO Batch processor with no failures', () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = processPartialResponse(event, sqsRecordHandler, processor);

// Assess
expect(result['batchItemFailures']).toStrictEqual([]);
});

test('SQS FIFO Batch processor with failures', () => {
// Prepare
const firstRecord = sqsRecordFactory('success');
const secondRecord = sqsRecordFactory('fail');
const thirdRecord = sqsRecordFactory('success');
const event = { Records: [firstRecord, secondRecord, thirdRecord] };
const processor = new SqsFifoPartialProcessor();

// Act
const result = processPartialResponse(event, sqsRecordHandler, processor);

// Assess
expect(result['batchItemFailures'].length).toBe(2);
expect(result['batchItemFailures'][0]['itemIdentifier']).toBe(
secondRecord.messageId
);
expect(result['batchItemFailures'][1]['itemIdentifier']).toBe(
thirdRecord.messageId
);
});
});
});
Loading