Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9900ab3
Add initial code for KafkaJson and KafkaAvro request handlers.
phipag May 15, 2025
47cbc76
Add deserialization via @Deserialization annotation.
phipag May 16, 2025
680b979
Add TODOs in code.
phipag May 16, 2025
15f2923
Fix typos and make AbstractKafkaDeserializer package private.
phipag May 16, 2025
7c4bf30
Merge phipag/kafka-event from public repository into feature/kafka
phipag May 21, 2025
6ac583f
Remove request handler implementation in favor for @Deserialization a…
phipag May 21, 2025
92ae6ab
Parse Timestamp type correctly.
phipag May 21, 2025
8e13dd6
Remove custom RequestHandler implementation example.
phipag May 21, 2025
623585a
Merge branch 'main' into feature/kafka
phipag May 26, 2025
5ea6d49
Make AspectJ version compatible with min version Java 11.
phipag May 26, 2025
73e64e4
Clarify exception message when deserialization fails.
phipag May 27, 2025
cbe9181
Add more advanced JSON escpaing to JSONSerializer in logging module.
phipag May 27, 2025
64e7080
Add protobuf deserialization logic and fully working example.
phipag May 27, 2025
f081424
Add Maven profile to compile a JAR with different dependency combinat…
phipag May 27, 2025
e11db9a
Add minimal kafka example.
phipag May 28, 2025
4282a77
Add missing copyright.
phipag May 28, 2025
64f7e18
Add unit tests for kafka utility.
phipag May 28, 2025
1191c56
Add minimal kafka example to examples module in pom.xml.
phipag May 29, 2025
db9b989
Add some comments.
phipag May 29, 2025
b64dcbd
Update powertools-examples-kafka with README and make it more minimal…
phipag Jun 6, 2025
4624c12
Implement PR feedback from Karthik.
phipag Jun 6, 2025
598cc27
Fix SAM outputs.
phipag Jun 6, 2025
92f6f8f
Do not fail on unknown properties when deserializating into KafkaEvent.
phipag Jun 6, 2025
7fcc989
Merge branch 'main' into feature/kafka
phipag Jun 16, 2025
77845af
Allow customers to bring their own kafka-clients dependency.
phipag Jun 16, 2025
e4875d8
Add Kafka utility documentation.
phipag Jun 16, 2025
767109b
Update project version consistently to 2.0.0.
phipag Jun 16, 2025
3e6a8b7
fix: Fix bug where abbreviated _HANDLER env var did not detect the De…
phipag Jun 17, 2025
ef04849
fix: Bug when trying to deserialize a type into itself for Lambda def…
phipag Jun 17, 2025
6da89a3
When falling back to Lambda default, handle conversion between InputS…
phipag Jun 17, 2025
2be14dd
Raise a runtime exception when the KafkaEvent is invalid.
phipag Jun 17, 2025
04cf14a
docs: Announce deprecation of v1
phipag Jun 16, 2025
f02c8fd
fix(metrics): Do not flush when no metrics were added to avoid printi…
phipag Jun 16, 2025
fa29f60
Merge branch 'main' into feature/kafka
phipag Jun 18, 2025
3c76357
Rename docs to Kafka Consumer and add line highlights for code examples.
phipag Jun 18, 2025
ffebe8c
Fix Spotbug issues.
phipag Jun 18, 2025
55c860a
Reduce cognitive complexity of DeserializationUtils making it more mo…
phipag Jun 18, 2025
03e5b11
Reduce cognitive complexity of AbstractKafkaDeserializer.
phipag Jun 18, 2025
a5689e9
Enable removal policy DESTROY on e2e test for kinesis streams and SQS…
phipag Jun 18, 2025
8f12c04
Replace System.out with Powertools Logging.
phipag Jun 18, 2025
335279e
Add notice about kafka-clients compatibility.
phipag Jun 18, 2025
e9654a9
Add sentence stating that Avro / Protobuf classes can be autogenerated.
phipag Jun 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<module>powertools-examples-idempotency</module>
<module>powertools-examples-parameters</module>
<module>powertools-examples-serialization</module>
<module>powertools-examples-kafka</module>
<module>powertools-examples-batch</module>
<module>powertools-examples-validation</module>
<module>powertools-examples-cloudformation</module>
Expand All @@ -56,4 +57,4 @@
</plugins>
</build>

</project>
</project>
51 changes: 51 additions & 0 deletions examples/powertools-examples-kafka/events/kafka-avro-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"mytopic-0": [
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "0g8MTGFwdG9wUrgehes/j0A=",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 16,
"timestamp": 1545084650988,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "1A8UU21hcnRwaG9uZVK4HoXrv4JA",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 17,
"timestamp": 1545084650989,
"timestampType": "CREATE_TIME",
"key": null,
"value": "1g8USGVhZHBob25lc0jhehSuv2JA",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
}
]
}
}
50 changes: 50 additions & 0 deletions examples/powertools-examples-kafka/events/kafka-json-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"mytopic-0": [
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": "cmVjb3JkS2V5",
"value": "ewogICJpZCI6IDEyMzQ1LAogICJuYW1lIjogInByb2R1Y3Q1IiwKICAicHJpY2UiOiA0NQp9",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"value": "ewogICJpZCI6IDEyMzQ1LAogICJuYW1lIjogInByb2R1Y3Q1IiwKICAicHJpY2UiOiA0NQp9",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
},
{
"topic": "mytopic",
"partition": 0,
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"key": null,
"value": "ewogICJpZCI6IDEyMzQ1LAogICJuYW1lIjogInByb2R1Y3Q1IiwKICAicHJpY2UiOiA0NQp9",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
}
]
}
}
122 changes: 122 additions & 0 deletions examples/powertools-examples-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>software.amazon.lambda.examples</groupId>
<version>2.0.0-SNAPSHOT</version>
<artifactId>powertools-examples-kafka</artifactId>
<packaging>jar</packaging>
<name>Powertools for AWS Lambda (Java) - Examples - Kafka</name>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<aspectj.version>1.9.24</aspectj.version>
<avro.version>1.12.0</avro.version>
</properties>

<dependencies>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-logging-log4j</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-metrics</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>${aspectj.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Don't deploy the example -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>dev.aspectj</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>1.14</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<complianceLevel>${maven.compiler.target}</complianceLevel>
<aspectLibraries>
<aspectLibrary>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-logging</artifactId>
</aspectLibrary>
<aspectLibrary>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-metrics</artifactId>
</aspectLibrary>
</aspectLibraries>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjtools</artifactId>
<version>${aspectj.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

</build>

</project>
10 changes: 10 additions & 0 deletions examples/powertools-examples-kafka/src/main/avro/AvroProduct.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"namespace": "org.demo.kafka.avro",
"type": "record",
"name": "AvroProduct",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "price", "type": "double"}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.demo.kafka;

import static software.amazon.lambda.powertools.logging.argument.StructuredArguments.entry;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.avro.AvroProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;

import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.Unit;
import software.amazon.lambda.powertools.kafka.KafkaAvroRequestHandler;
import software.amazon.lambda.powertools.logging.Logging;
import software.amazon.lambda.powertools.metrics.Metrics;
import software.amazon.lambda.powertools.metrics.MetricsUtils;

public class KafkaAvroConsumerDeserializationFunction extends KafkaAvroRequestHandler<String, AvroProduct, String> {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAvroConsumerDeserializationFunction.class);
private static final MetricsLogger metrics = MetricsUtils.metricsLogger();

@Override
@Logging
@Metrics
public String handleRecords(ConsumerRecords<String, AvroProduct> records, Context context) {
for (ConsumerRecord<String, AvroProduct> consumerRecord : records) {
LOGGER.info("{}", consumerRecord, entry("value", avroToMap(consumerRecord.value())));
metrics.putMetric("ProcessedAvroRecord", 1, Unit.COUNT);
}

return "OK";
}

// TODO: Helper method because Avro objects cannot be serialized by the Jackson ObjectMapper used in the Logging
// module entry("value", consumerRecord.value()) would fallback to a string instead of native json object.
private Map<String, Object> avroToMap(AvroProduct avroProduct) {
if (avroProduct == null) {
return Collections.emptyMap();
}
Map<String, Object> map = new HashMap<>();
map.put("id", avroProduct.getId());
map.put("name", avroProduct.getName());
map.put("price", avroProduct.getPrice());
return map;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.demo.kafka;

import static software.amazon.lambda.powertools.logging.argument.StructuredArguments.entry;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;

import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.Unit;
import software.amazon.lambda.powertools.kafka.KafkaJsonRequestHandler;
import software.amazon.lambda.powertools.logging.Logging;
import software.amazon.lambda.powertools.metrics.Metrics;
import software.amazon.lambda.powertools.metrics.MetricsUtils;

public class KafkaJsonConsumerDeserializationFunction extends KafkaJsonRequestHandler<String, Product, String> {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJsonConsumerDeserializationFunction.class);
private static final MetricsLogger metrics = MetricsUtils.metricsLogger();

@Override
@Logging
@Metrics
public String handleRecords(ConsumerRecords<String, Product> records, Context context) {
for (ConsumerRecord<String, Product> consumerRecord : records) {
LOGGER.info("{}", consumerRecord, entry("value", consumerRecord.value()));
metrics.putMetric("ProcessedRecord", 1, Unit.COUNT);
}

return "OK";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.demo.kafka;

import static software.amazon.lambda.powertools.logging.argument.StructuredArguments.entry;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.avro.AvroProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
import software.amazon.cloudwatchlogs.emf.model.Unit;
import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;
import software.amazon.lambda.powertools.metrics.Metrics;
import software.amazon.lambda.powertools.metrics.MetricsUtils;

public class NativeKafkaAvroConsumerDeserializationFunction
implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> {

private static final Logger LOGGER = LoggerFactory.getLogger(NativeKafkaAvroConsumerDeserializationFunction.class);
private static final MetricsLogger metrics = MetricsUtils.metricsLogger();

@Override
@Logging
@Metrics
@Deserialization(type = DeserializationType.KAFKA_AVRO)
public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) {
for (ConsumerRecord<String, AvroProduct> consumerRecord : records) {
LOGGER.info("{}", consumerRecord, entry("value", avroToMap(consumerRecord.value())));
metrics.putMetric("ProcessedAvroRecord", 1, Unit.COUNT);
}

return "OK";
}

// TODO: Helper method because Avro objects cannot be serialized by the Jackson ObjectMapper used in the Logging
// module entry("value", consumerRecord.value()) would fallback to a string instead of native json object.
private Map<String, Object> avroToMap(AvroProduct avroProduct) {
if (avroProduct == null) {
return Collections.emptyMap();
}
Map<String, Object> map = new HashMap<>();
map.put("id", avroProduct.getId());
map.put("name", avroProduct.getName());
map.put("price", avroProduct.getPrice());
return map;
}
}
Loading