Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
2c20023
Starting to sketch out shape of API for batch processor
scottgerring Jun 21, 2023
63ebcab
Merge branch 'main' into rfc-batch-processor
scottgerring Jun 22, 2023
c34f5d9
Merge branch 'main' into rfc-batch-processor
Jun 25, 2023
870ec47
Variant 1
Jun 25, 2023
5469287
Some more examples
Jun 25, 2023
0435d18
Add extra bit for handling message-specific mutation
Jun 25, 2023
6241014
Make clear what's not public
Jun 25, 2023
8168719
test with interfaces
jeromevdl Jun 28, 2023
b0675b7
move tests
jeromevdl Jun 28, 2023
5ae16f9
refactoring a bit
jeromevdl Jun 29, 2023
3127096
refactoring and adding FIFO
jeromevdl Jun 30, 2023
9b8a310
refactoring and adding FIFO
jeromevdl Jun 30, 2023
12cb97f
adding FIFO management
jeromevdl Jul 1, 2023
061bfb1
cleanup
jeromevdl Jul 1, 2023
6b07f78
add javadoc
jeromevdl Jul 3, 2023
7d2e1fb
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Jul 3, 2023
b69c09b
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 11, 2023
cc28ce5
Flesh out builder option a bit
scottgerring Jul 11, 2023
916c26f
Flesh out a bit more
scottgerring Jul 11, 2023
96c30ff
more changes
scottgerring Jul 11, 2023
ee64d62
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 19, 2023
26d8da5
Leaning into the builder style. needs some more thought
scottgerring Jul 19, 2023
a9517f6
The shape of it is rightish
scottgerring Jul 20, 2023
d3ad219
Working working
scottgerring Jul 20, 2023
418e32b
Work
scottgerring Jul 20, 2023
a1e441c
Work on kinesis batch handler
scottgerring Jul 24, 2023
28d1f8d
More tests
scottgerring Jul 24, 2023
a660a8b
More tests and starting to add an example
scottgerring Jul 24, 2023
2332e2d
Working on batch
scottgerring Jul 25, 2023
4af594c
feat(batch): initial DdbBatchMessageHandler implementation
mriccia Jul 25, 2023
34a58e6
more
scottgerring Jul 26, 2023
ed161e9
fix pom.xml for powertools-examples-batch
mriccia Jul 26, 2023
f8812be
Add dynamodb example
mriccia Jul 26, 2023
53a5abe
Move template into subdir
scottgerring Jul 26, 2023
9872410
Better structure
scottgerring Jul 26, 2023
e53c1e9
tidy up
mriccia Jul 26, 2023
4ca1726
Trying to get kinesis going
scottgerring Jul 26, 2023
fb422da
Kinesis demo working
scottgerring Jul 26, 2023
a862daf
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Jul 26, 2023
b8af8de
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 26, 2023
4118768
Updated readme
scottgerring Jul 26, 2023
e75de66
Deprecated everywhere
scottgerring Jul 26, 2023
1e7d305
Address initial review comments
scottgerring Jul 27, 2023
6ad7e7d
Add success tests for Kinesis/S3
scottgerring Jul 27, 2023
8f69551
Increase DDB coverage
scottgerring Jul 27, 2023
672ba51
Tell sonar to ignore dupes in examples
scottgerring Jul 27, 2023
70a08ba
Add docs
scottgerring Jul 27, 2023
dae6131
Add warning
scottgerring Jul 27, 2023
5e4d709
Merge remote-tracking branch 'origin/main' into rfc-batch-processor
scottgerring Jul 27, 2023
6d842da
More doco
scottgerring Jul 27, 2023
9de7d0a
Format
scottgerring Jul 27, 2023
0e95238
Docs good
scottgerring Jul 27, 2023
ba17efd
Disabling formatting check for now as its breaking the build and I ca…
scottgerring Jul 27, 2023
e87e6dc
Make checkstyle happy
scottgerring Jul 27, 2023
5c046a6
Add docs from heitor
scottgerring Jul 27, 2023
56af4d0
More docs changes
scottgerring Jul 28, 2023
e0fd524
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 28, 2023
c43df12
Merge branch 'main' into rfc-batch-processor
scottgerring Aug 2, 2023
5d8be7a
Merged
scottgerring Aug 2, 2023
033a922
move ddb template in the right folder
mriccia Aug 2, 2023
13baa6a
Changes
scottgerring Aug 2, 2023
ca6cd09
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Aug 2, 2023
00fe0b0
add items updates and deletions to ddb example
mriccia Aug 2, 2023
1f65ceb
Will it blend?
scottgerring Aug 2, 2023
ebc7630
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Aug 2, 2023
bb74fe4
More changes
scottgerring Aug 2, 2023
76e51a6
e2e test handler
mriccia Aug 2, 2023
0e4b018
Try work for SQS only
scottgerring Aug 2, 2023
f8c9802
Merged
scottgerring Aug 2, 2023
c641d4b
More greatness
scottgerring Aug 2, 2023
0dbf1a3
Almost good
scottgerring Aug 2, 2023
7834a6e
SQS works
scottgerring Aug 2, 2023
062870f
Also kinesis e2e
scottgerring Aug 2, 2023
5548dc0
Lets try doing it with streams
scottgerring Aug 2, 2023
3cf034a
Try make it work with streams
scottgerring Aug 2, 2023
55cbbf4
Streams?
scottgerring Aug 3, 2023
6e44fe6
Make SQS test work
scottgerring Aug 3, 2023
0c9f8b8
SQS and Kinesis work
scottgerring Aug 3, 2023
4b13467
Merge branch 'main' into rfc-batch-processor
scottgerring Aug 3, 2023
006b9e0
DynamoDB E2E works
scottgerring Aug 3, 2023
c3ae363
Formatting
scottgerring Aug 3, 2023
570d051
Try exclude e2e-tests from dupe checking
scottgerring Aug 3, 2023
6303a75
Rename sonar file
scottgerring Aug 3, 2023
f91c2c1
Formatting
scottgerring Aug 3, 2023
db0e7f1
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
fa9aa11
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
710a94c
Address review comments
scottgerring Aug 4, 2023
06373a7
Merge
scottgerring Aug 4, 2023
294313f
Missed one
scottgerring Aug 4, 2023
c63fb28
Formatting
scottgerring Aug 4, 2023
e27a31f
Cleanup doc linking
scottgerring Aug 4, 2023
0929fb1
More doco
scottgerring Aug 4, 2023
3d8aca8
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
1ff4658
Update batch.md
scottgerring Aug 4, 2023
83fdf12
Skip aspectj run
scottgerring Aug 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Format
  • Loading branch information
scottgerring committed Jul 27, 2023
commit 9de7d0af41c96fb806f11a85ca7c86c24c372a3f
6 changes: 4 additions & 2 deletions examples/powertools-examples-batch/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Powertools for AWS Lambda (Java) - Batch Example

This project contains examples of Lambda function using the batch processing module of Powertools for AWS Lambda (Java). For more information on this module, please refer to the
This project contains examples of Lambda function using the batch processing module of Powertools for AWS Lambda (Java).
For more information on this module, please refer to the
[documentation](https://docs.powertools.aws.dev/lambda-java/utilities/batch/).

Three different examples and SAM deployments are included, covering each of the batch sources:
Expand All @@ -17,6 +18,7 @@ started with SAM in [the examples directory](../README.md)
This sample contains three different deployments, depending on which batch processor you'd like to use, you can
change to the subdirectory containing the example SAM template, and deploy. For instance, for the SQS batch
deployment:

```bash
cd deploy/sqs
sam build
Expand All @@ -25,7 +27,7 @@ sam deploy --guided

## Test the application

Each of the examples uses a Lambda scheduled every 5 minutes to push a batch, and a separate lambda to read it. To
Each of the examples uses a Lambda scheduled every 5 minutes to push a batch, and a separate lambda to read it. To
see this in action, we can simply tail the logs of our stack:

```bash
Expand Down
14 changes: 7 additions & 7 deletions examples/powertools-examples-batch/deploy/kinesis/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ Resources:
STREAM_NAME: !Ref DemoKinesisStream
Policies:
- Statement:
- Sid: WriteToKinesis
Effect: Allow
Action:
- kinesis:PutRecords
- kinesis:DescribeStream
Resource: !GetAtt DemoKinesisStream.Arn
- Sid: WriteToKinesis
Effect: Allow
Action:
- kinesis:PutRecords
- kinesis:DescribeStream
Resource: !GetAtt DemoKinesisStream.Arn
Events:
CWSchedule:
Type: Schedule
Properties:
Schedule: 'rate(5 minutes)'
Name: !Join ["-", ["message-producer-schedule", !Select [0, !Split [-, !Select [2, !Split [/, !Ref AWS::StackId ]]]]]]
Name: !Join [ "-", [ "message-producer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
Description: Produce message to Kinesis via a Lambda function
Enabled: true

Expand Down
26 changes: 13 additions & 13 deletions examples/powertools-examples-batch/deploy/sqs/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ Resources:
QUEUE_URL: !Ref DemoSqsQueue
Policies:
- Statement:
- Sid: SQSSendMessageBatch
Effect: Allow
Action:
- sqs:SendMessageBatch
- sqs:SendMessage
Resource: !GetAtt DemoSqsQueue.Arn
- Sid: SQSKMSKey
Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource: !GetAtt CustomerKey.Arn
- Sid: SQSSendMessageBatch
Effect: Allow
Action:
- sqs:SendMessageBatch
- sqs:SendMessage
Resource: !GetAtt DemoSqsQueue.Arn
- Sid: SQSKMSKey
Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource: !GetAtt CustomerKey.Arn
Events:
CWSchedule:
Type: Schedule
Properties:
Schedule: 'rate(5 minutes)'
Name: !Join ["-", ["message-producer-schedule", !Select [0, !Split [-, !Select [2, !Split [/, !Ref AWS::StackId ]]]]]]
Name: !Join [ "-", [ "message-producer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
Description: Produce message to SQS via a Lambda function
Enabled: true

Expand Down
4 changes: 2 additions & 2 deletions examples/powertools-examples-batch/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.demo.batch.model.Product;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
import java.security.SecureRandom;
import java.util.UUID;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.demo.batch.model.DdbProduct;
Expand All @@ -14,10 +17,6 @@
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

import java.security.SecureRandom;
import java.util.UUID;
import java.util.stream.IntStream;

public class DynamoDBWriter implements RequestHandler<ScheduledEvent, String> {

private static final Logger LOGGER = LogManager.getLogger(DynamoDBWriter.class);
Expand Down Expand Up @@ -54,9 +53,10 @@ public String handleRequest(ScheduledEvent scheduledEvent, Context context) {
productBuilder.addPutItem(product);
});

BatchWriteItemEnhancedRequest batchWriteItemEnhancedRequest = BatchWriteItemEnhancedRequest.builder().writeBatches(
productBuilder.build())
.build();
BatchWriteItemEnhancedRequest batchWriteItemEnhancedRequest =
BatchWriteItemEnhancedRequest.builder().writeBatches(
productBuilder.build())
.build();

BatchWriteResult batchWriteResult = enhancedClient.batchWriteItem(batchWriteItemEnhancedRequest);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
package org.demo.batch.kinesis;

import static java.util.stream.Collectors.toList;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.security.SecureRandom;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.demo.batch.model.Product;
import software.amazon.awssdk.core.BytesWrapper;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;

import java.security.SecureRandom;
import java.util.List;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
* limitations under the License.
*
*/

package org.demo.batch.model;

import java.util.Objects;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey;

import java.util.Objects;

@DynamoDbBean
public class DdbProduct {
private String id;
Expand Down Expand Up @@ -62,8 +62,12 @@ public void setPrice(double price) {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DdbProduct that = (DdbProduct) o;
return Double.compare(that.price, price) == 0 && Objects.equals(id, that.id) && Objects.equals(name, that.name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* limitations under the License.
*
*/

package org.demo.batch.model;

import java.util.Objects;
Expand Down Expand Up @@ -57,8 +58,12 @@ public void setPrice(double price) {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Product product = (Product) o;
return id == product.id && Double.compare(product.price, price) == 0 && Objects.equals(name, product.name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import org.demo.batch.model.Product;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.demo.batch.model.Product;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
private final static Logger LOGGER = LogManager.getLogger(SqsBatchHandler.class);
private final static Logger LOGGER = LogManager.getLogger(SqsBatchHandler.class);
private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;

public SqsBatchHandler() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package org.demo.batch.sqs;

import static java.util.stream.Collectors.toList;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.security.SecureRandom;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.demo.batch.model.Product;
Expand All @@ -14,12 +19,6 @@
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;

import java.security.SecureRandom;
import java.util.List;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;


/**
* A Lambda handler used to send message batches to SQS. This is only here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<Configuration>
<Appenders>
<Console name="JsonAppender" target="SYSTEM_OUT">
<JsonTemplateLayout eventTemplateUri="classpath:LambdaJsonLayout.json" />
<JsonTemplateLayout eventTemplateUri="classpath:LambdaJsonLayout.json"/>
</Console>
</Appenders>
<Loggers>
Expand Down
2 changes: 1 addition & 1 deletion examples/powertools-examples-batch/template-dynamodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Resources:
Type: Schedule
Properties:
Schedule: 'rate(5 minutes)'
Name: !Join ["-", ["ddb-writer-schedule", !Select [0, !Split [-, !Select [2, !Split [/, !Ref AWS::StackId ]]]]]]
Name: !Join [ "-", [ "ddb-writer-schedule", !Select [ 0, !Split [ -, !Select [ 2, !Split [ /, !Ref AWS::StackId ] ] ] ] ] ]
Description: Produce message to SQS via a Lambda function
Enabled: true

4 changes: 2 additions & 2 deletions powertools-batch/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package software.amazon.lambda.powertools.batch;

import software.amazon.lambda.powertools.batch.builder.*;
import software.amazon.lambda.powertools.batch.builder.DynamoDbBatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.builder.KinesisBatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.builder.SqsBatchMessageHandlerBuilder;

/**
* A builder-style interface we can use to build batch processing handlers for SQS, Kinesis Streams,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package software.amazon.lambda.powertools.batch.builder;

import com.amazonaws.services.lambda.runtime.Context;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

import java.util.function.BiConsumer;
import java.util.function.Consumer;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

/**
*
* An abstract class to capture common arguments used across all the message-binding-specific batch processing
* builders. The builders provide a fluent interface to configure the batch processors. Any arguments specific
* to a particular batch binding can be added to the child builder.
*
* <p>
* We capture types for the various messages involved, so that we can provide an interface that makes
* sense for the concrete child.
*
Expand All @@ -28,7 +26,7 @@ abstract class AbstractBatchMessageHandlerBuilder<T, C, E, R> {
* Provides an (Optional!) success handler. A success handler is invoked
* once for each message after it has been processed by the user-provided
* handler.
*
* <p>
* If the success handler throws, the item in the batch will be
* marked failed.
*
Expand All @@ -45,7 +43,7 @@ public C withSuccessHandler(Consumer<T> handler) {
* user-provided handler. This gives the user's code a useful hook to do
* anything else that might have to be done in response to a failure - for
* instance, updating a metric, or writing a detailed log.
*
* <p>
* Please note that this method has nothing to do with the partial batch
* failure mechanism. Regardless of whether a failure handler is
* specified, partial batch failures and responses to the Lambda environment
Expand All @@ -64,12 +62,12 @@ public C withFailureHandler(BiConsumer<T, Throwable> handler) {
* takes a function that consumes a raw message and the Lambda context. This
* is useful for handlers that need access to the entire message object, not
* just the deserialized contents of the body.
*
* <p>
* Note: If you don't need the Lambda context, use the variant of this function
* that does not require it.
*
* @param handler Takes a raw message - the underlying AWS Events Library event - to process.
* For instance for SQS this would be an SQSMessage.
* For instance for SQS this would be an SQSMessage.
* @return A BatchMessageHandler for processing the batch
*/
public abstract BatchMessageHandler<E, R> buildWithRawMessageHandler(BiConsumer<T, Context> handler);
Expand All @@ -82,7 +80,7 @@ public C withFailureHandler(BiConsumer<T, Throwable> handler) {
* just the deserialized contents of the body.
*
* @param handler Takes a raw message - the underlying AWS Events Library event - to process.
* For instance for SQS this would be an SQSMessage.
* For instance for SQS this would be an SQSMessage.
* @return A BatchMessageHandler for processing the batch
*/
public BatchMessageHandler<E, R> buildWithRawMessageHandler(Consumer<T> handler) {
Expand All @@ -101,13 +99,14 @@ public BatchMessageHandler<E, R> buildWithRawMessageHandler(Consumer<T> handler)
* @param handler Processes the deserialized body of the message
* @return A BatchMessageHandler for processing the batch
*/
public abstract <M> BatchMessageHandler<E, R> buildWithMessageHandler(BiConsumer<M, Context> handler, Class<M> messageClass);
public abstract <M> BatchMessageHandler<E, R> buildWithMessageHandler(BiConsumer<M, Context> handler,
Class<M> messageClass);

/**
* Builds a BatchMessageHandler that can be used to process batches, given
* a user-defined handler to process each item in the batch. This variant
* takes a function that consumes the deserialized body of the given message
* If deserialization fails, it will be treated as
* If deserialization fails, it will be treated as
* failure of the processing of that item in the batch.
* Note: If you don't need the Lambda context, use the variant of this function
* that does not require it.
Expand Down
Loading