Skip to content

Commit 08b56e1

Browse files
authored
docs(kafka): add preview docs page (#4072)
1 parent ef9bb52 commit 08b56e1

28 files changed

+1293
-7
lines changed

docs/features/index.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,12 @@ description: Features of Powertools for AWS Lambda
8787

8888
[:octicons-arrow-right-24: Read more](./validation.md)
8989

90+
- __Kafka__
91+
92+
---
93+
94+
Utility to easily handle message deserialization and parsing of Kafka events in AWS Lambda functions.
95+
96+
[:octicons-arrow-right-24: Read more](./kafka.md)
97+
9098
</div>

docs/features/kafka.md

Lines changed: 345 additions & 0 deletions
Large diffs are not rendered by default.

docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ Powertools for AWS Lambda (TypeScript) is built as a modular toolkit, so you can
5454
| [JMESPath Functions](./features/jmespath.md) | Built-in JMESPath functions to easily deserialize common encoded JSON payloads in Lambda functions. |
5555
| [Parser](./features/parser.md) | Utility to parse and validate AWS Lambda event payloads using Zod, a TypeScript-first schema declaration and validation library. |
5656
| [Validation](./features/validation.md) | JSON Schema validation for events and responses, including JMESPath support to unwrap events before validation. |
57+
| [Kafka](./features/kafka.md) | Utility to easily handle message deserialization and parsing of Kafka events in AWS Lambda functions. |
5758

5859
## Examples
5960

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
2+
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
3+
import { Logger } from '@aws-lambda-powertools/logger';
4+
import { type } from 'arktype';
5+
6+
const logger = new Logger({ serviceName: 'kafka-consumer' });
7+
8+
const OrderItemSchema = type({
9+
productId: 'string',
10+
quantity: 'number.integer >= 1',
11+
price: 'number.integer',
12+
});
13+
14+
const OrderSchema = type({
15+
id: 'string',
16+
customerId: 'string',
17+
items: OrderItemSchema.array().moreThanLength(0),
18+
createdAt: 'string.date',
19+
totalAmount: 'number.integer >= 0',
20+
});
21+
22+
const schemaConfig = {
23+
value: {
24+
type: SchemaType.JSON,
25+
parserSchema: OrderSchema,
26+
},
27+
} satisfies SchemaConfig;
28+
29+
export const handler = kafkaConsumer<unknown, typeof OrderSchema.infer>(
30+
async (event, _context) => {
31+
for (const {
32+
value: { id, items },
33+
} of event.records) {
34+
logger.setCorrelationId(id);
35+
logger.debug(`order includes ${items.length} items`);
36+
}
37+
},
38+
schemaConfig
39+
);
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import {
2+
IdempotencyConfig,
3+
makeIdempotent,
4+
} from '@aws-lambda-powertools/idempotency';
5+
import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb';
6+
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
7+
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
8+
import { Logger } from '@aws-lambda-powertools/logger';
9+
import { User } from './samples/user.es6.generated.js'; // protobuf generated class
10+
11+
const logger = new Logger({ serviceName: 'kafka-consumer' });
12+
const persistenceStore = new DynamoDBPersistenceLayer({
13+
tableName: 'IdempotencyTable',
14+
});
15+
16+
const schemaConfig = {
17+
value: {
18+
type: SchemaType.PROTOBUF,
19+
schema: User,
20+
},
21+
} satisfies SchemaConfig;
22+
23+
const processRecord = makeIdempotent(
24+
async (user, topic, partition, offset) => {
25+
logger.info('processing user', {
26+
userId: user.id,
27+
meta: {
28+
topic,
29+
partition,
30+
offset,
31+
},
32+
});
33+
34+
// ...your business logic here
35+
36+
return {
37+
success: true,
38+
userId: user.id,
39+
};
40+
},
41+
{
42+
persistenceStore,
43+
config: new IdempotencyConfig({
44+
eventKeyJmesPath: `topic & '-' & partition & '-' & offset`,
45+
}),
46+
}
47+
);
48+
49+
export const handler = kafkaConsumer(async (event, _context) => {
50+
for (const { value, topic, partition, offset } of event.records) {
51+
await processRecord(value, topic, partition, offset);
52+
}
53+
}, schemaConfig);
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
2+
import { Logger } from '@aws-lambda-powertools/logger';
3+
import { type IUser, User } from './samples/user.es6.generated.js'; // protobuf generated class
4+
5+
const logger = new Logger({ serviceName: 'kafka-consumer' });
6+
7+
export const handler = kafkaConsumer<unknown, IUser>(
8+
async (event, _context) => {
9+
for (const {
10+
value,
11+
topic,
12+
partition,
13+
offset,
14+
timestamp,
15+
headers,
16+
} of event.records) {
17+
logger.info(`processing message from topic ${topic}`, {
18+
partition,
19+
offset,
20+
timestamp,
21+
});
22+
23+
if (headers) {
24+
for (const header of headers) {
25+
logger.debug(`Header: ${header.key}`, {
26+
value: header.value,
27+
});
28+
}
29+
}
30+
31+
// Process the deserialized value
32+
logger.info('User data', {
33+
userId: value.id,
34+
userName: value.name,
35+
});
36+
}
37+
},
38+
{
39+
value: {
40+
type: SchemaType.PROTOBUF,
41+
schema: User,
42+
},
43+
}
44+
);
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
2+
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
3+
import { Logger } from '@aws-lambda-powertools/logger';
4+
import * as v from 'valibot';
5+
6+
const logger = new Logger({ serviceName: 'kafka-consumer' });
7+
8+
const OrderItemSchema = v.object({
9+
productId: v.string(),
10+
quantity: v.pipe(v.number(), v.integer(), v.toMinValue(1)),
11+
price: v.pipe(v.number(), v.integer()),
12+
});
13+
14+
const OrderSchema = v.object({
15+
id: v.string(),
16+
customerId: v.string(),
17+
items: v.pipe(
18+
v.array(OrderItemSchema),
19+
v.minLength(1, 'Order must have at least one item')
20+
),
21+
createdAt: v.pipe(v.string(), v.isoDateTime()),
22+
totalAmount: v.pipe(v.number(), v.toMinValue(0)),
23+
});
24+
25+
const schemaConfig = {
26+
value: {
27+
type: SchemaType.JSON,
28+
parserSchema: OrderSchema,
29+
},
30+
} satisfies SchemaConfig;
31+
32+
export const handler = kafkaConsumer<unknown, v.InferInput<typeof OrderSchema>>(
33+
async (event, _context) => {
34+
for (const {
35+
value: { id, items },
36+
} of event.records) {
37+
logger.setCorrelationId(id);
38+
logger.debug(`order includes ${items.length} items`);
39+
}
40+
},
41+
schemaConfig
42+
);
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
2+
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
3+
import { Logger } from '@aws-lambda-powertools/logger';
4+
import { z } from 'zod/v4';
5+
6+
const logger = new Logger({ serviceName: 'kafka-consumer' });
7+
8+
const OrderItemSchema = z.object({
9+
productId: z.string(),
10+
quantity: z.number().int().positive(),
11+
price: z.number().positive(),
12+
});
13+
14+
const OrderSchema = z.object({
15+
id: z.string(),
16+
customerId: z.string(),
17+
items: z.array(OrderItemSchema).min(1, 'Order must have at least one item'),
18+
createdAt: z.iso.datetime(),
19+
totalAmount: z.number().positive(),
20+
});
21+
22+
const schemaConfig = {
23+
value: {
24+
type: SchemaType.JSON,
25+
parserSchema: OrderSchema,
26+
},
27+
} satisfies SchemaConfig;
28+
29+
export const handler = kafkaConsumer<unknown, z.infer<typeof OrderSchema>>(
30+
async (event, _context) => {
31+
for (const {
32+
value: { id, items },
33+
} of event.records) {
34+
logger.setCorrelationId(id);
35+
logger.debug(`order includes ${items.length} items`);
36+
}
37+
},
38+
schemaConfig
39+
);
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
flowchart LR
2+
KafkaTopic["Kafka Topic"] --> MSK["Amazon MSK"]
3+
KafkaTopic --> MSKServerless["Amazon MSK Serverless"]
4+
KafkaTopic --> SelfHosted["Self-hosted Kafka"]
5+
MSK --> EventSourceMapping["Event Source Mapping"]
6+
MSKServerless --> EventSourceMapping
7+
SelfHosted --> EventSourceMapping
8+
EventSourceMapping --> Lambda["Lambda Function"]
9+
Lambda --> KafkaConsumer["Kafka Consumer Utility"]
10+
KafkaConsumer --> Deserialization["Deserialization"]
11+
Deserialization --> YourLogic["Your Business Logic"]
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
sequenceDiagram
2+
participant Kafka
3+
participant ESM as Event Source Mapping
4+
participant SchemaRegistry as Schema Registry
5+
participant Lambda
6+
participant KafkaConsumer
7+
participant YourCode
8+
Kafka->>+ESM: Send batch of records
9+
ESM->>+SchemaRegistry: Validate and deserialize
10+
SchemaRegistry->>SchemaRegistry: Deserialize records
11+
SchemaRegistry-->>-ESM: Return deserialized data
12+
ESM->>+Lambda: Invoke with pre-deserialized JSON records
13+
Lambda->>+KafkaConsumer: Pass Kafka event
14+
KafkaConsumer->>KafkaConsumer: Parse event structure
15+
loop For each record
16+
KafkaConsumer->>KafkaConsumer: Record is already deserialized
17+
alt Output serializer provided
18+
KafkaConsumer->>KafkaConsumer: Apply output serializer
19+
end
20+
end
21+
KafkaConsumer->>+YourCode: Provide ConsumerRecords
22+
YourCode->>YourCode: Process records
23+
YourCode-->>-KafkaConsumer: Return result
24+
KafkaConsumer-->>-Lambda: Pass result back
25+
Lambda-->>-ESM: Return response
26+
ESM-->>-Kafka: Acknowledge processed batch
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
sequenceDiagram
2+
participant Kafka
3+
participant ESM as Event Source Mapping
4+
participant SchemaRegistry as Schema Registry
5+
participant Lambda
6+
participant KafkaConsumer
7+
participant YourCode
8+
Kafka->>+ESM: Send batch of records
9+
ESM->>+SchemaRegistry: Validate schema
10+
SchemaRegistry-->>-ESM: Confirm schema is valid
11+
ESM->>+Lambda: Invoke with validated records (still encoded)
12+
Lambda->>+KafkaConsumer: Pass Kafka event
13+
KafkaConsumer->>KafkaConsumer: Parse event structure
14+
loop For each record
15+
KafkaConsumer->>KafkaConsumer: Decode base64 data
16+
KafkaConsumer->>KafkaConsumer: Deserialize based on schema_type
17+
alt Output serializer provided
18+
KafkaConsumer->>KafkaConsumer: Apply output serializer
19+
end
20+
end
21+
KafkaConsumer->>+YourCode: Provide ConsumerRecords
22+
YourCode->>YourCode: Process records
23+
YourCode-->>-KafkaConsumer: Return result
24+
KafkaConsumer-->>-Lambda: Pass result back
25+
Lambda-->>-ESM: Return response
26+
ESM-->>-Kafka: Acknowledge processed batch
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
sequenceDiagram
2+
participant Kafka
3+
participant Lambda
4+
participant KafkaConsumer
5+
participant YourCode
6+
Kafka->>+Lambda: Invoke with batch of records (direct integration)
7+
Lambda->>+KafkaConsumer: Pass raw Kafka event
8+
KafkaConsumer->>KafkaConsumer: Parse event structure
9+
loop For each record
10+
KafkaConsumer->>KafkaConsumer: Decode base64 data
11+
KafkaConsumer->>KafkaConsumer: Deserialize based on schema_type
12+
alt Output serializer provided
13+
KafkaConsumer->>KafkaConsumer: Apply output serializer
14+
end
15+
end
16+
KafkaConsumer->>+YourCode: Provide ConsumerRecords
17+
YourCode->>YourCode: Process records
18+
YourCode-->>-KafkaConsumer: Return result
19+
KafkaConsumer-->>-Lambda: Pass result back
20+
Lambda-->>-Kafka: Acknowledge processed batch
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { readFileSync } from 'node:fs';
2+
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
3+
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
4+
import { Logger } from '@aws-lambda-powertools/logger';
5+
6+
const logger = new Logger({ serviceName: 'kafka-consumer' });
7+
8+
const schemaConfig = {
9+
value: {
10+
type: SchemaType.AVRO,
11+
schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'),
12+
},
13+
} satisfies SchemaConfig;
14+
15+
export const handler = kafkaConsumer(async (event, _context) => {
16+
for (const { value } of event.records) {
17+
logger.info('received value', { value });
18+
}
19+
}, schemaConfig);
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
2+
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
3+
import { Logger } from '@aws-lambda-powertools/logger';
4+
5+
const logger = new Logger({ serviceName: 'kafka-consumer' });
6+
7+
const schemaConfig = {
8+
value: {
9+
type: SchemaType.JSON,
10+
},
11+
} satisfies SchemaConfig;
12+
13+
export const handler = kafkaConsumer(async (event, _context) => {
14+
for (const { value } of event.records) {
15+
logger.info('received value', { value });
16+
}
17+
}, schemaConfig);

0 commit comments

Comments
 (0)