Skip to content

Commit ef67b43

Browse files
sdangoldreamorosi
andauthored
fix(batch): fixed the build issue with Batch processor due to missing dependencies (#4498)
Co-authored-by: Andrea Amorosi <dreamorosi@gmail.com>
1 parent 22b3b96 commit ef67b43

File tree

9 files changed

+309
-344
lines changed

9 files changed

+309
-344
lines changed

docs/features/batch.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,9 @@ Available transformers by event type:
358358
--8<-- "examples/snippets/batch/samples/parser_SQS.json"
359359
```
360360

361+
!!! note
362+
If `innerSchema` is used with DynamoDB streams, the schema will be applied to both the `NewImage` and the `OldImage` by default. If you want to have dedicated schemas, see the section below.
363+
361364
#### Using full event schema
362365

363366
For complete control over validation, extend the built-in schemas with your custom payload schema. This approach gives you full control over the entire event structure.

package-lock.json

Lines changed: 43 additions & 35 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/batch/package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,12 @@
8181
"nodejs"
8282
],
8383
"dependencies": {
84-
"@aws-lambda-powertools/commons": "2.26.1"
84+
"@aws-lambda-powertools/commons": "2.26.1",
85+
"@standard-schema/spec": "^1.0.0"
8586
},
8687
"devDependencies": {
88+
"@aws-lambda-powertools/parser": "2.26.1",
8789
"@aws-lambda-powertools/testing-utils": "file:../testing",
88-
"@aws-lambda-powertools/parser": "2.26.1"
90+
"zod": "^4.1.8"
8991
}
9092
}

packages/batch/src/BasePartialBatchProcessor.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import {
1313
} from './constants.js';
1414
import { FullBatchFailureError } from './errors.js';
1515
import type {
16-
BasePartialBatchProcessorParserConfig,
16+
BatchProcessorConfig,
1717
EventSourceDataClassTypes,
1818
PartialItemFailureResponse,
1919
PartialItemFailures,
@@ -55,7 +55,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
5555
/**
5656
* The configuration options for the parser integration
5757
*/
58-
protected parserConfig?: BasePartialBatchProcessorParserConfig;
58+
protected parserConfig?: BatchProcessorConfig;
5959

6060
/**
6161
* Initializes base batch processing class
@@ -64,7 +64,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
6464
*/
6565
public constructor(
6666
eventType: keyof typeof EventType,
67-
parserConfig?: BasePartialBatchProcessorParserConfig
67+
parserConfig?: BatchProcessorConfig
6868
) {
6969
super();
7070
this.eventType = eventType;

packages/batch/src/BatchProcessor.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
3333
* });
3434
* ```
3535
*
36-
* **Process batch triggered by Kinesis Data Streams*
36+
* **Process batch triggered by Kinesis Data Streams**
3737
*
3838
* @example
3939
* ```typescript
@@ -113,6 +113,40 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
113113
* });
114114
* ```
115115
*
116+
* **Process batch with inner schema validation**
117+
*
118+
* @example
119+
* ```typescript
120+
* import {
121+
* BatchProcessor,
122+
* EventType,
123+
* processPartialResponse,
124+
* } from '@aws-lambda-powertools/batch';
125+
* import { parser } from '@aws-lambda-powertools/batch/parser';
126+
* import type { SQSHandler } from 'aws-lambda';
127+
* import { z } from 'zod';
128+
*
129+
* const myItemSchema = z.object({ name: z.string(), age: z.number() });
130+
*
131+
* const processor = new BatchProcessor(EventType.SQS, {
132+
* parser,
133+
* innerSchema: myItemSchema,
134+
* transformer: 'json'
135+
* });
136+
*
137+
* const recordHandler = async (record) => {
138+
* // record is now fully typed and validated
139+
* console.log(record.body.name, record.body.age);
140+
* };
141+
*
142+
* export const handler: SQSHandler = async (event, context) =>
143+
* processPartialResponse(event, recordHandler, processor, {
144+
* context,
145+
* });
146+
* ```
147+
*
148+
* Note: If `innerSchema` is used with DynamoDB streams, the schema will be applied to both the NewImage and the OldImage by default. If you want to have separate schema for both, you will need to extend the schema and use the full schema for parsing.
149+
*
116150
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
117151
*/
118152
class BatchProcessor extends BasePartialBatchProcessor {

packages/batch/src/parser.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,30 @@
11
import type { GenericLogger } from '@aws-lambda-powertools/commons/types';
22
import type { StandardSchemaV1 } from '@standard-schema/spec';
3-
import type { ZodType } from 'zod';
43
import { EventType, SchemaVendor } from './constants.js';
54
import { ParsingError } from './errors.js';
65
import type {
7-
BasePartialBatchProcessorParserConfig,
6+
BatchProcessorConfig,
87
EventSourceDataClassTypes,
98
} from './types.js';
109

1110
/**
1211
* Extend the schema according to the event type passed.
1312
*
14-
* If useTransformers is true, extend using opinionated transformers.
13+
* If `useTransformers` is true, extend using opinionated transformers.
1514
* Otherwise, extend without any transformers.
1615
*
16+
* The vendor is already checked at runtime to ensure Zod is being used when required using `StandardSchemaV1['~standard'].vendor`.
17+
*
1718
* @param options - The options for creating the extended schema
1819
* @param options.eventType - The type of event to process (SQS, Kinesis, DynamoDB)
19-
* @param options.schema - The StandardSchema to be used for parsing
20+
* @param options.innerSchema - The StandardSchema to be used for parsing. To avoid forcing a direct dependency on Zod, we use `unknown` here, which is not ideal but necessary.
2021
* @param options.useTransformers - Whether to use transformers for parsing
2122
* @param options.logger - A logger instance for logging
2223
*/
2324
const createExtendedSchema = async (options: {
2425
eventType: keyof typeof EventType;
25-
innerSchema: ZodType;
26-
transformer?: BasePartialBatchProcessorParserConfig['transformer'];
26+
innerSchema: unknown;
27+
transformer?: BatchProcessorConfig['transformer'];
2728
}) => {
2829
const { eventType, innerSchema, transformer } = options;
2930
let schema = innerSchema;
@@ -32,20 +33,23 @@ const createExtendedSchema = async (options: {
3233
const { JSONStringified } = await import(
3334
'@aws-lambda-powertools/parser/helpers'
3435
);
36+
// @ts-expect-error - we know it's a Zod schema due to the runtime check earlier
3537
schema = JSONStringified(innerSchema);
3638
break;
3739
}
3840
case 'base64': {
3941
const { Base64Encoded } = await import(
4042
'@aws-lambda-powertools/parser/helpers'
4143
);
44+
// @ts-expect-error - we know it's a Zod schema due to the runtime check earlier
4245
schema = Base64Encoded(innerSchema);
4346
break;
4447
}
4548
case 'unmarshall': {
4649
const { DynamoDBMarshalled } = await import(
4750
'@aws-lambda-powertools/parser/helpers/dynamodb'
4851
);
52+
// @ts-expect-error - we know it's a Zod schema due to the runtime check earlier
4953
schema = DynamoDBMarshalled(innerSchema);
5054
break;
5155
}
@@ -73,8 +77,8 @@ const createExtendedSchema = async (options: {
7377
);
7478
return DynamoDBStreamRecord.extend({
7579
dynamodb: DynamoDBStreamChangeRecordBase.extend({
76-
OldImage: schema.optional(),
77-
NewImage: schema.optional(),
80+
OldImage: schema,
81+
NewImage: schema,
7882
}),
7983
});
8084
};
@@ -101,7 +105,7 @@ const parseWithErrorHandling = async (
101105
const errorMessage = issues
102106
.map((issue) => `${issue.path?.join('.')}: ${issue.message}`)
103107
.join('; ');
104-
logger.debug(errorMessage);
108+
logger.debug(`Failed to parse record: ${errorMessage}`);
105109
throw new ParsingError(errorMessage);
106110
};
107111

@@ -111,7 +115,8 @@ const parseWithErrorHandling = async (
111115
* If the passed schema is already an extended schema,
112116
* use the schema directly to parse the record.
113117
*
114-
* Only Zod Schemas are supported for schema extension.
118+
* Parts of the parser integration within BatchProcessor rely on Zod for schema transformations,
119+
* however some other parts also support other Standard Schema-compatible libraries.
115120
*
116121
* @param record - The record to be parsed
117122
* @param eventType - The type of event to process
@@ -122,7 +127,7 @@ const parser = async (
122127
record: EventSourceDataClassTypes,
123128
eventType: keyof typeof EventType,
124129
logger: Pick<GenericLogger, 'debug' | 'warn' | 'error'>,
125-
parserConfig: BasePartialBatchProcessorParserConfig
130+
parserConfig: BatchProcessorConfig
126131
): Promise<EventSourceDataClassTypes> => {
127132
const { schema, innerSchema, transformer } = parserConfig;
128133
// If the external schema is specified, use it to parse the record

0 commit comments

Comments
 (0)