Skip to content

Commit 96977ff

Browse files
sdangoldreamorosi
andauthored
refactor(batch): simplified the parser integration api with batch processor (#4465)
Co-authored-by: Andrea Amorosi <dreamorosi@gmail.com>
1 parent b050235 commit 96977ff

File tree

10 files changed

+617
-551
lines changed

10 files changed

+617
-551
lines changed

packages/batch/package.json

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,21 @@
3838
"default": "./lib/esm/index.js"
3939
}
4040
},
41+
"./parser": {
42+
"import": "./lib/esm/parser.js",
43+
"require": "./lib/cjs/parser.js"
44+
},
4145
"./types": {
4246
"import": "./lib/esm/types.js",
4347
"require": "./lib/cjs/types.js"
4448
}
4549
},
4650
"typesVersions": {
4751
"*": {
52+
"parser": [
53+
"lib/cjs/parser.d.ts",
54+
"lib/esm/parser.d.ts"
55+
],
4856
"types": [
4957
"lib/cjs/types.d.ts",
5058
"lib/esm/types.d.ts"
@@ -76,4 +84,4 @@
7684
"@aws-lambda-powertools/testing-utils": "file:../testing",
7785
"@aws-lambda-powertools/parser": "2.25.2"
7886
}
79-
}
87+
}

packages/batch/src/BasePartialBatchProcessor.ts

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import type { StandardSchemaV1 } from '@standard-schema/spec';
1+
import { getStringFromEnv } from '@aws-lambda-powertools/commons/utils/env';
22
import type {
33
DynamoDBRecord,
44
KinesisStreamRecord,
55
SQSRecord,
66
} from 'aws-lambda';
7+
import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js';
78
import { BasePartialProcessor } from './BasePartialProcessor.js';
89
import {
910
DATA_CLASS_MAPPING,
@@ -12,7 +13,7 @@ import {
1213
} from './constants.js';
1314
import { FullBatchFailureError } from './errors.js';
1415
import type {
15-
BasePartialBatchProcessorConfig,
16+
BasePartialBatchProcessorParserConfig,
1617
EventSourceDataClassTypes,
1718
PartialItemFailureResponse,
1819
PartialItemFailures,
@@ -45,9 +46,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
4546
public eventType: keyof typeof EventType;
4647

4748
/**
48-
* The schema of the body of the event record for parsing
49+
* A logger instance to be used for logging debug, warning, and error messages.
50+
*
51+
* When no logger is provided, we'll only log warnings and errors using the global `console` object.
52+
*/
53+
protected readonly logger: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;
54+
55+
/**
56+
* The configuration options for the parser integration
4957
*/
50-
protected schema?: StandardSchemaV1;
58+
protected parserConfig?: BasePartialBatchProcessorParserConfig;
5159

5260
/**
5361
* Initializes base batch processing class
@@ -56,7 +64,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
5664
*/
5765
public constructor(
5866
eventType: keyof typeof EventType,
59-
config?: BasePartialBatchProcessorConfig
67+
parserConfig?: BasePartialBatchProcessorParserConfig
6068
) {
6169
super();
6270
this.eventType = eventType;
@@ -66,9 +74,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
6674
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(),
6775
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(),
6876
};
69-
if (config) {
70-
this.schema = config.schema;
71-
}
77+
this.parserConfig = parserConfig;
78+
const alcLogLevel = getStringFromEnv({
79+
key: 'AWS_LAMBDA_LOG_LEVEL',
80+
defaultValue: '',
81+
});
82+
this.logger = parserConfig?.logger ?? {
83+
debug: alcLogLevel === 'DEBUG' ? console.debug : () => undefined,
84+
error: console.error,
85+
warn: console.warn,
86+
};
7287
}
7388

7489
/**
Lines changed: 9 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
1-
import type { StandardSchemaV1 } from '@standard-schema/spec';
2-
import type { StreamRecord } from 'aws-lambda';
31
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
4-
import { EventType, SchemaVendor } from './constants.js';
52
import { BatchProcessingError } from './errors.js';
6-
import type {
7-
BaseRecord,
8-
EventSourceDataClassTypes,
9-
FailureResponse,
10-
SuccessResponse,
11-
} from './types.js';
3+
import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
124

135
/**
146
* Process records in a batch asynchronously and handle partial failure cases.
@@ -108,13 +100,16 @@ class BatchProcessor extends BasePartialBatchProcessor {
108100
record: BaseRecord
109101
): Promise<SuccessResponse | FailureResponse> {
110102
try {
111-
const recordToProcess =
112-
this.schema == null
113-
? record
114-
: await this.#parseRecord(record, this.eventType, this.schema);
103+
const recordToProcess = this.parserConfig?.parser
104+
? await this.parserConfig.parser(
105+
record,
106+
this.eventType,
107+
this.logger,
108+
this.parserConfig
109+
)
110+
: record;
115111
const data = this.toBatchType(recordToProcess, this.eventType);
116112
const result = await this.handler(data, this.options?.context);
117-
118113
return this.successHandler(record, result);
119114
} catch (error) {
120115
return this.failureHandler(record, error as Error);
@@ -133,163 +128,6 @@ class BatchProcessor extends BasePartialBatchProcessor {
133128
'Not implemented. Use asyncProcess() instead.'
134129
);
135130
}
136-
137-
/**
138-
* Extend the schema according to the event type passed.
139-
*
140-
* If useTransformers is true, extend using opinionated transformers.
141-
* Otherwise, extend without any transformers.
142-
*
143-
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
144-
* @param schema - The StandardSchema to be used for parsing
145-
* @param useTransformers - Whether to use transformers for parsing
146-
*/
147-
async #createExtendedSchema(options: {
148-
eventType: keyof typeof EventType;
149-
schema: StandardSchemaV1;
150-
useTransformers: boolean;
151-
}) {
152-
const { eventType, schema, useTransformers } = options;
153-
switch (eventType) {
154-
case EventType.SQS: {
155-
if (useTransformers) {
156-
const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([
157-
import('@aws-lambda-powertools/parser/helpers'),
158-
import('@aws-lambda-powertools/parser/schemas/sqs'),
159-
]);
160-
return SqsRecordSchema.extend({
161-
body: JSONStringified(schema as any),
162-
});
163-
}
164-
const { SqsRecordSchema } = await import(
165-
'@aws-lambda-powertools/parser/schemas/sqs'
166-
);
167-
return SqsRecordSchema.extend({ body: schema });
168-
}
169-
170-
case EventType.KinesisDataStreams: {
171-
if (useTransformers) {
172-
const [
173-
{ Base64Encoded },
174-
{ KinesisDataStreamRecord, KinesisDataStreamRecordPayload },
175-
] = await Promise.all([
176-
import('@aws-lambda-powertools/parser/helpers'),
177-
import('@aws-lambda-powertools/parser/schemas/kinesis'),
178-
]);
179-
return KinesisDataStreamRecord.extend({
180-
kinesis: KinesisDataStreamRecordPayload.extend({
181-
data: Base64Encoded(schema as any),
182-
}),
183-
});
184-
}
185-
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
186-
await import('@aws-lambda-powertools/parser/schemas/kinesis');
187-
return KinesisDataStreamRecord.extend({
188-
kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }),
189-
});
190-
}
191-
192-
case EventType.DynamoDBStreams: {
193-
if (useTransformers) {
194-
const [
195-
{ DynamoDBMarshalled },
196-
{ DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase },
197-
] = await Promise.all([
198-
import('@aws-lambda-powertools/parser/helpers/dynamodb'),
199-
import('@aws-lambda-powertools/parser/schemas/dynamodb'),
200-
]);
201-
return DynamoDBStreamRecord.extend({
202-
dynamodb: DynamoDBStreamChangeRecordBase.extend({
203-
OldImage: DynamoDBMarshalled<StreamRecord['OldImage']>(
204-
schema as any
205-
).optional(),
206-
NewImage: DynamoDBMarshalled<StreamRecord['NewImage']>(
207-
schema as any
208-
).optional(),
209-
}),
210-
});
211-
}
212-
const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
213-
await import('@aws-lambda-powertools/parser/schemas/dynamodb');
214-
return DynamoDBStreamRecord.extend({
215-
dynamodb: DynamoDBStreamChangeRecordBase.extend({
216-
OldImage: (schema as any).optional(),
217-
NewImage: (schema as any).optional(),
218-
}),
219-
});
220-
}
221-
222-
default: {
223-
console.warn(
224-
`The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}`
225-
);
226-
throw new Error('Unsupported event type');
227-
}
228-
}
229-
}
230-
231-
/**
232-
* Parse the record according to the schema and event type passed.
233-
*
234-
* If the passed schema is already an extended schema,
235-
* use the schema directly to parse the record.
236-
*
237-
* Only Zod Schemas are supported for schema extension.
238-
*
239-
* @param record - The record to be parsed
240-
* @param eventType - The type of event to process
241-
* @param schema - The StandardSchema to be used for parsing
242-
*/
243-
async #parseRecord(
244-
record: EventSourceDataClassTypes,
245-
eventType: keyof typeof EventType,
246-
schema: StandardSchemaV1
247-
): Promise<EventSourceDataClassTypes> {
248-
const { parse } = await import('@aws-lambda-powertools/parser');
249-
// Try parsing with the original schema first
250-
const extendedSchemaParsing = parse(record, undefined, schema, true);
251-
if (extendedSchemaParsing.success) {
252-
return extendedSchemaParsing.data as EventSourceDataClassTypes;
253-
}
254-
// Only proceed with schema extension if it's a Zod schema
255-
if (schema['~standard'].vendor !== SchemaVendor.Zod) {
256-
console.warn(
257-
'The schema provided is not supported. Only Zod schemas are supported for extension.'
258-
);
259-
throw new Error('Unsupported schema type');
260-
}
261-
// Handle schema extension based on event type
262-
// Try without transformers first, then with transformers
263-
const schemaWithoutTransformers = await this.#createExtendedSchema({
264-
eventType,
265-
schema,
266-
useTransformers: false,
267-
});
268-
const schemaWithoutTransformersParsing = parse(
269-
record,
270-
undefined,
271-
schemaWithoutTransformers,
272-
true
273-
);
274-
if (schemaWithoutTransformersParsing.success) {
275-
return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes;
276-
}
277-
const schemaWithTransformers = await this.#createExtendedSchema({
278-
eventType,
279-
schema,
280-
useTransformers: true,
281-
});
282-
const schemaWithTransformersParsing = parse(
283-
record,
284-
undefined,
285-
schemaWithTransformers,
286-
true
287-
);
288-
if (schemaWithTransformersParsing.success) {
289-
return schemaWithTransformersParsing.data as EventSourceDataClassTypes;
290-
}
291-
throw new Error('Failed to parse record');
292-
}
293131
}
294132

295133
export { BatchProcessor };

packages/batch/src/errors.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,21 @@ class UnexpectedBatchTypeError extends BatchProcessingError {
6363
}
6464
}
6565

66+
/**
67+
* Error thrown by the Batch Processing utility when a record fails to be parsed.
68+
*/
69+
class ParsingError extends BatchProcessingError {
70+
public constructor(message: string) {
71+
super(message);
72+
this.name = 'ParsingError';
73+
}
74+
}
75+
6676
export {
6777
BatchProcessingError,
6878
FullBatchFailureError,
6979
SqsFifoShortCircuitError,
7080
SqsFifoMessageGroupShortCircuitError,
7181
UnexpectedBatchTypeError,
82+
ParsingError,
7283
};

packages/batch/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export { EventType } from './constants.js';
55
export {
66
BatchProcessingError,
77
FullBatchFailureError,
8+
ParsingError,
89
SqsFifoMessageGroupShortCircuitError,
910
SqsFifoShortCircuitError,
1011
UnexpectedBatchTypeError,

0 commit comments

Comments
 (0)