Skip to content

feat: Add Batch Processor module #1317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 95 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
2c20023
Starting to sketch out shape of API for batch processor
scottgerring Jun 21, 2023
63ebcab
Merge branch 'main' into rfc-batch-processor
scottgerring Jun 22, 2023
c34f5d9
Merge branch 'main' into rfc-batch-processor
Jun 25, 2023
870ec47
Variant 1
Jun 25, 2023
5469287
Some more examples
Jun 25, 2023
0435d18
Add extra bit for handling message-specific mutation
Jun 25, 2023
6241014
Make clear what's not public
Jun 25, 2023
8168719
test with interfaces
jeromevdl Jun 28, 2023
b0675b7
move tests
jeromevdl Jun 28, 2023
5ae16f9
refactoring a bit
jeromevdl Jun 29, 2023
3127096
refactoring and adding FIFO
jeromevdl Jun 30, 2023
9b8a310
refactoring and adding FIFO
jeromevdl Jun 30, 2023
12cb97f
adding FIFO management
jeromevdl Jul 1, 2023
061bfb1
cleanup
jeromevdl Jul 1, 2023
6b07f78
add javadoc
jeromevdl Jul 3, 2023
7d2e1fb
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Jul 3, 2023
b69c09b
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 11, 2023
cc28ce5
Flesh out builder option a bit
scottgerring Jul 11, 2023
916c26f
Flesh out a bit more
scottgerring Jul 11, 2023
96c30ff
more changes
scottgerring Jul 11, 2023
ee64d62
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 19, 2023
26d8da5
Leaning into the builder style. needs some more thought
scottgerring Jul 19, 2023
a9517f6
The shape of it is rightish
scottgerring Jul 20, 2023
d3ad219
Working working
scottgerring Jul 20, 2023
418e32b
Work
scottgerring Jul 20, 2023
a1e441c
Work on kinesis batch handler
scottgerring Jul 24, 2023
28d1f8d
More tests
scottgerring Jul 24, 2023
a660a8b
More tests and starting to add an example
scottgerring Jul 24, 2023
2332e2d
Working on batch
scottgerring Jul 25, 2023
4af594c
feat(batch): initial DdbBatchMessageHandler implementation
mriccia Jul 25, 2023
34a58e6
more
scottgerring Jul 26, 2023
ed161e9
fix pom.xml for powertools-examples-batch
mriccia Jul 26, 2023
f8812be
Add dynamodb example
mriccia Jul 26, 2023
53a5abe
Move template into subdir
scottgerring Jul 26, 2023
9872410
Better structure
scottgerring Jul 26, 2023
e53c1e9
tidy up
mriccia Jul 26, 2023
4ca1726
Trying to get kinesis going
scottgerring Jul 26, 2023
fb422da
Kinesis demo working
scottgerring Jul 26, 2023
a862daf
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Jul 26, 2023
b8af8de
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 26, 2023
4118768
Updated readme
scottgerring Jul 26, 2023
e75de66
Deprecated everywhere
scottgerring Jul 26, 2023
1e7d305
Address initial review comments
scottgerring Jul 27, 2023
6ad7e7d
Add success tests for Kinesis/S3
scottgerring Jul 27, 2023
8f69551
Increase DDB coverage
scottgerring Jul 27, 2023
672ba51
Tell sonar to ignore dupes in examples
scottgerring Jul 27, 2023
70a08ba
Add docs
scottgerring Jul 27, 2023
dae6131
Add warning
scottgerring Jul 27, 2023
5e4d709
Merge remote-tracking branch 'origin/main' into rfc-batch-processor
scottgerring Jul 27, 2023
6d842da
More doco
scottgerring Jul 27, 2023
9de7d0a
Format
scottgerring Jul 27, 2023
0e95238
Docs good
scottgerring Jul 27, 2023
ba17efd
Disabling formatting check for now as its breaking the build and I ca…
scottgerring Jul 27, 2023
e87e6dc
Make checkstyle happy
scottgerring Jul 27, 2023
5c046a6
Add docs from heitor
scottgerring Jul 27, 2023
56af4d0
More docs changes
scottgerring Jul 28, 2023
e0fd524
Merge branch 'main' into rfc-batch-processor
scottgerring Jul 28, 2023
c43df12
Merge branch 'main' into rfc-batch-processor
scottgerring Aug 2, 2023
5d8be7a
Merged
scottgerring Aug 2, 2023
033a922
move ddb template in the right folder
mriccia Aug 2, 2023
13baa6a
Changes
scottgerring Aug 2, 2023
ca6cd09
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Aug 2, 2023
00fe0b0
add items updates and deletions to ddb example
mriccia Aug 2, 2023
1f65ceb
Will it blend?
scottgerring Aug 2, 2023
ebc7630
Merge remote-tracking branch 'refs/remotes/origin/rfc-batch-processor…
scottgerring Aug 2, 2023
bb74fe4
More changes
scottgerring Aug 2, 2023
76e51a6
e2e test handler
mriccia Aug 2, 2023
0e4b018
Try work for SQS only
scottgerring Aug 2, 2023
f8c9802
Merged
scottgerring Aug 2, 2023
c641d4b
More greatness
scottgerring Aug 2, 2023
0dbf1a3
Almost good
scottgerring Aug 2, 2023
7834a6e
SQS works
scottgerring Aug 2, 2023
062870f
Also kinesis e2e
scottgerring Aug 2, 2023
5548dc0
Lets try doing it with streams
scottgerring Aug 2, 2023
3cf034a
Try make it work with streams
scottgerring Aug 2, 2023
55cbbf4
Streams?
scottgerring Aug 3, 2023
6e44fe6
Make SQS test work
scottgerring Aug 3, 2023
0c9f8b8
SQS and Kinesis work
scottgerring Aug 3, 2023
4b13467
Merge branch 'main' into rfc-batch-processor
scottgerring Aug 3, 2023
006b9e0
DynamoDB E2E works
scottgerring Aug 3, 2023
c3ae363
Formatting
scottgerring Aug 3, 2023
570d051
Try exclude e2e-tests from dupe checking
scottgerring Aug 3, 2023
6303a75
Rename sonar file
scottgerring Aug 3, 2023
f91c2c1
Formatting
scottgerring Aug 3, 2023
db0e7f1
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
fa9aa11
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
710a94c
Address review comments
scottgerring Aug 4, 2023
06373a7
Merge
scottgerring Aug 4, 2023
294313f
Missed one
scottgerring Aug 4, 2023
c63fb28
Formatting
scottgerring Aug 4, 2023
e27a31f
Cleanup doc linking
scottgerring Aug 4, 2023
0929fb1
More doco
scottgerring Aug 4, 2023
3d8aca8
Update docs/utilities/batch.md
scottgerring Aug 4, 2023
1ff4658
Update batch.md
scottgerring Aug 4, 2023
83fdf12
Skip aspectj run
scottgerring Aug 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address initial review comments
  • Loading branch information
scottgerring committed Jul 27, 2023
commit 1e7d305b2ea152c30ec9c8a6ec57ff9b871e38a8
1 change: 1 addition & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<module>powertools-examples-parameters</module>
<module>powertools-examples-serialization</module>
<module>powertools-examples-sqs</module>
<module>powertools-examples-batch</module>
<module>powertools-examples-validation</module>
<module>powertools-examples-cloudformation</module>
</modules>
Expand Down
10 changes: 10 additions & 0 deletions examples/powertools-examples-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@
<artifactId>sdk-core</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>url-connection-client</artifactId>
<version>${sdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRec
}

@Override
public StreamsEventResponse handleRequest(DynamodbEvent kinesisEvent, Context context) {
return handler.processBatch(kinesisEvent, context);
public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
return handler.processBatch(ddbEvent, context);
}

private void processMessage(Product p, Context c) {
LOGGER.info("Processing product " + p);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@

import static java.util.stream.Collectors.toList;


/**
* A Lambda handler used to send message batches to Kinesis Streams. This is only here
* to produce an end-to-end demo, so that the {{@link org.demo.batch.kinesis.KinesisBatchHandler}}
* has some data to consume.
*/
public class KinesisBatchSender implements RequestHandler<ScheduledEvent, String> {

private static final Logger LOGGER = LogManager.getLogger(KinesisBatchSender.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import static java.util.stream.Collectors.toList;


/**
* A Lambda handler used to send message batches to SQS. This is only here
* to produce an end-to-end demo, so that the {{@link org.demo.batch.sqs.SqsBatchHandler}}
* has some data to consume.
*/
public class SqsBatchSender implements RequestHandler<ScheduledEvent, String> {

private static final Logger LOGGER = LogManager.getLogger(SqsBatchSender.class);
Expand Down
3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@
<module>powertools-cloudformation</module>
<module>powertools-idempotency</module>
<module>powertools-e2e-tests</module>
<module>examples</module>
<module>powertools-batch</module>
<module>examples/powertools-examples-batch</module>
<module>examples</module>
</modules>

<scm>
Expand Down
38 changes: 0 additions & 38 deletions powertools-batch/.gitignore

This file was deleted.

9 changes: 0 additions & 9 deletions powertools-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@
<artifactId>powertools-serialization</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-idempotency</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
Expand Down Expand Up @@ -61,10 +56,6 @@
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-sqs</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import software.amazon.lambda.powertools.batch.builder.*;

/**
* A builder-style interface we can use within an existing Lambda RequestHandler to
* deal with our batch responses. A second tier of builders is returned per-event-source
* to bind the appropriate message types and provider source-specific logic and tuneables.
* A builder-style interface we can use to build batch processing handlers for SQS, Kinesis Streams,
* and DynamoDB Streams batches. The batch processing handlers that are returned allow
* the user to easily process batches of messages, one-by-one, while offloading
* the common issues - failure handling, partial responses, deserialization -
* to the library.
*
* @see <a href="https://docs.powertools.aws.dev/lambda/java/utilities/batch/">Powertools for AWS Lambda (Java) Batch Documentation</a>
**/
public class BatchMessageHandlerBuilder {

Expand All @@ -23,8 +27,8 @@ public SqsBatchMessageHandlerBuilder withSqsBatchHandler() {
*
* @return A fluent builder interface to continue the building
*/
public DdbBatchMessageHandlerBuilder withDynamoDbBatchHandler() {
return new DdbBatchMessageHandlerBuilder();
public DynamoDbBatchMessageHandlerBuilder withDynamoDbBatchHandler() {
return new DynamoDbBatchMessageHandlerBuilder();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ abstract class AbstractBatchMessageHandlerBuilder<T, C, E, R> {
protected Consumer<T> successHandler;

/**
* Provides a success handler. A success handler is invoked once for
* each message after it has been processed by the user-provided
* Provides an (Optional!) success handler. A success handler is invoked
* once for each message after it has been processed by the user-provided
* handler.
*
* If the success handler throws, the item in the batch will be
* marked failed.
*
Expand All @@ -39,9 +40,16 @@ public C withSuccessHandler(Consumer<T> handler) {
}

/**
* Provides a failure handler. A failure handler is invoked once
* for each message after it has failed to be processed by the
* user-provided handler.
* Provides an (Optional!) failure handler. A failure handler is invoked
* once for each message after it has failed to be processed by the
* user-provided handler. This gives the user's code a useful hook to do
* anything else that might have to be done in response to a failure - for
* instance, updating a metric, or writing a detailed log.
*
* Please note that this method has nothing to do with the partial batch
* failure mechanism. Regardless of whether a failure handler is
* specified, partial batch failures and responses to the Lambda environment
* are handled by the batch utility separately.
*
* @param handler The handler to invoke on failure
*/
Expand All @@ -53,21 +61,28 @@ public C withFailureHandler(BiConsumer<T, Throwable> handler) {
/**
* Builds a BatchMessageHandler that can be used to process batches, given
* a user-defined handler to process each item in the batch. This variant
* takes a function that consumes a raw message and the Lambda context.
* takes a function that consumes a raw message and the Lambda context. This
* is useful for handlers that need access to the entire message object, not
* just the deserialized contents of the body.
*
* Note: If you don't need the Lambda context, use the variant of this function
* that does not require it.
*
* @param handler Takes a raw message - the AWS-SDK-shaped inner type of the batch - to process
* @param handler Takes a raw message - the underlying AWS Events Library event - to process.
* For instance for SQS this would be an SQSMessage.
* @return A BatchMessageHandler for processing the batch
*/
public abstract BatchMessageHandler<E, R> buildWithRawMessageHandler(BiConsumer<T, Context> handler);

/**
* Builds a BatchMessageHandler that can be used to process batches, given
* a user-defined handler to process each item in the batch. This variant
* takes a function that consumes a raw message.
* takes a function that consumes a raw message and the Lambda context. This
* is useful for handlers that need access to the entire message object, not
* just the deserialized contents of the body.
*
* @param handler Takes a raw message - the AWS-SDK-shaped inner type of the batch - to process
* @param handler Takes a raw message - the underlying AWS Events Library event - to process.
* For instance for SQS this would be an SQSMessage.
* @return A BatchMessageHandler for processing the batch
*/
public BatchMessageHandler<E, R> buildWithRawMessageHandler(Consumer<T> handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.exception.DeserializationNotSupportedException;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
import software.amazon.lambda.powertools.batch.handler.DdbBatchMessageHandler;
import software.amazon.lambda.powertools.batch.handler.DynamoDbBatchMessageHandler;

import java.util.function.BiConsumer;

public class DdbBatchMessageHandlerBuilder extends AbstractBatchMessageHandlerBuilder<DynamodbEvent.DynamodbStreamRecord,
DdbBatchMessageHandlerBuilder,
/**
* Builds a batch processor for processing DynamoDB Streams batch events
**/
public class DynamoDbBatchMessageHandlerBuilder extends AbstractBatchMessageHandlerBuilder<DynamodbEvent.DynamodbStreamRecord,
DynamoDbBatchMessageHandlerBuilder,
DynamodbEvent,
StreamsEventResponse> {


@Override
public BatchMessageHandler<DynamodbEvent, StreamsEventResponse> buildWithRawMessageHandler(BiConsumer<DynamodbEvent.DynamodbStreamRecord, Context> rawMessageHandler) {
return new DdbBatchMessageHandler(
return new DynamoDbBatchMessageHandler(
this.successHandler,
this.failureHandler,
rawMessageHandler);
Expand All @@ -30,7 +33,7 @@ public <M> BatchMessageHandler<DynamodbEvent, StreamsEventResponse> buildWithMes
}

@Override
protected DdbBatchMessageHandlerBuilder getThis() {
protected DynamoDbBatchMessageHandlerBuilder getThis() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

import java.util.function.BiConsumer;

/**
* Builds a batch processor for processing Kinesis Streams batch events
*/
public class KinesisBatchMessageHandlerBuilder extends AbstractBatchMessageHandlerBuilder<KinesisEvent.KinesisEventRecord,
KinesisBatchMessageHandlerBuilder,
KinesisEvent,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package software.amazon.lambda.powertools.batch.exception;

/**
* Thrown by message handlers that do not support deserializing arbitrary payload
* contents. This is the case for instance with DynamoDB Streams, which stream
* changesets about user-defined data, but not the user-defined data models themselves.
*/
public class DeserializationNotSupportedException extends RuntimeException {

public DeserializationNotSupportedException() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class DdbBatchMessageHandler implements BatchMessageHandler<DynamodbEvent, StreamsEventResponse>{
Logger LOGGER = LoggerFactory.getLogger(DdbBatchMessageHandler.class);
/**
* A batch message processor for DynamoDB Streams batches.
*
* @see <a href="https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting">DynamoDB Streams batch failure reporting</a>
*
*/
public class DynamoDbBatchMessageHandler implements BatchMessageHandler<DynamodbEvent, StreamsEventResponse>{
private final static Logger LOGGER = LoggerFactory.getLogger(DynamoDbBatchMessageHandler.class);

private final Consumer<DynamodbEvent.DynamodbStreamRecord> successHandler;
private final BiConsumer<DynamodbEvent.DynamodbStreamRecord, Throwable> failureHandler;
private final BiConsumer<DynamodbEvent.DynamodbStreamRecord, Context> rawMessageHandler;

public DdbBatchMessageHandler(Consumer<DynamodbEvent.DynamodbStreamRecord> successHandler, BiConsumer<DynamodbEvent.DynamodbStreamRecord, Throwable> failureHandler, BiConsumer<DynamodbEvent.DynamodbStreamRecord, Context> rawMessageHandler) {
public DynamoDbBatchMessageHandler(Consumer<DynamodbEvent.DynamodbStreamRecord> successHandler, BiConsumer<DynamodbEvent.DynamodbStreamRecord, Throwable> failureHandler, BiConsumer<DynamodbEvent.DynamodbStreamRecord, Context> rawMessageHandler) {
this.successHandler = successHandler;
this.failureHandler = failureHandler;
this.rawMessageHandler = rawMessageHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
import java.util.function.Consumer;

/**
* A batch message handler for Kinesis Streams batch processing.
* A batch message processor for Kinesis Streams batch processing.
*
* Refer to <a href="https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting">The kinesis batch processing document</a>
* @param <M>
* Refer to <a href="https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting">Kinesis Batch failure reporting</a>
* @param <M> The user-defined type of the Kinesis record payload
*/
public class KinesisStreamsBatchMessageHandler <M> implements BatchMessageHandler<KinesisEvent, StreamsEventResponse> {
Logger LOGGER = LoggerFactory.getLogger(KinesisStreamsBatchMessageHandler.class);
private final static Logger LOGGER = LoggerFactory.getLogger(KinesisStreamsBatchMessageHandler.class);

private final BiConsumer<KinesisEvent.KinesisEventRecord, Context> rawMessageHandler;
private final BiConsumer<M, Context> messageHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* A batch message processor for SQS batches.
*
* @see <a href="https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting">SQS Batch failure reporting</a>
* @param <M> The user-defined type of the message payload
*/
public class SqsBatchMessageHandler <M> implements BatchMessageHandler<SQSEvent, SQSBatchResponse> {
private final Class<M> messageClass;
Logger LOGGER = LoggerFactory.getLogger(SqsBatchMessageHandler.class);
private final static Logger LOGGER = LoggerFactory.getLogger(SqsBatchMessageHandler.class);

// The attribute on an SQS-FIFO message used to record the message group ID
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#sample-fifo-queues-message-event
String MESSAGE_GROUP_ID_KEY = "MessageGroupId";
private final static String MESSAGE_GROUP_ID_KEY = "MessageGroupId";

private final Class<M> messageClass;
private final BiConsumer<M, Context> messageHandler;
private final BiConsumer<SQSEvent.SQSMessage, Context> rawMessageHandler;
private final Consumer<SQSEvent.SQSMessage> successHandler;
Expand All @@ -30,7 +36,6 @@ public SqsBatchMessageHandler(BiConsumer<M, Context> messageHandler, Class<M> me
this.rawMessageHandler = rawMessageHandler;
this.successHandler = successHandler;
this.failureHandler = failureHandler;

}

@Override
Expand Down
Loading