Powerful config driven SDK to easily consume events from one or multiple Kafka topics in both standard and streaming modes. It seamlessly converts an existing app into a Kafka Subscriber or builds a new one from scratch with low code.
-
Standard Consumer SDK : suitable for an application which subscribes to an event in Kafka and wants to read and process the data only
-
KStreams Consumer SDK : suitable for an application which subscribes to an event in Kafka and wants to read, process and write back the processed data to another Kafka topic
If you are consuming from multiple topics, you can use a combination of Consumer/KStreams to fit the needs of each topic.
- Removes the overhead of setting up a Kafka consumer and connecting to a cluster
- Develop with Kafka regardless of experience level
- Consume from multiple Kafka topics in parallel and process them independently
- Optional configuration for flexible development
- Can use Kafka with SSL or non SSL
- Convert an existing app into a Kafka Subscriber or build a new one from scratch while writing very little code.
- Sample app provided can just be copied to run a Kafka Subscriber out of the box
- Provides a replay / recover mechanism to get the same message from a Kafka topic at the same offset if the message processing fails
- Java
- Spring Framework
- Kafka
-
If using SSL, consumers need to procure the Kafka topics' SSL certificates and add them to the configuration file. They would also need to procure their own certificate and add it to the JKS file.
-
Create a maven project and add the Jakasu SDK dependency to your project (check out the latest version)
<dependency>
<groupId>com.americanexpress.jakasu</groupId>
<artifactId>jakasu-subscriber-core</artifactId>
<version>1.0.0</version> // checkout latest released version
</dependency>
-
Create a Subscriber by implementing the Subscriber interface. The Subscriber interface consists of 3 methods:
-
subscribe(String payload, Map headers) : This is the main method which is triggered when a event/message is received on the subscribed kafka topic (configured in property file)
-
handleError(SubscriberException ex) : This method will be invoked by framework automatically when the subscribe method throws a SubscriberException. You can log / handle this error, and the offset is automatically acknowledged so that you are not stuck and can keep processing next message
-
recoveryHandler(String payload, Map headers) : This is an alternate path of processing which is triggered. After all the configured retries have expired and if this method is successful - it automatically acknowledges offset so that it can go on processing with other messages using the normal flow. If this method is also unsuccessful, then the Kafka listener will shutdown
-
public class TestSubscriber implements Subscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(TestSubscriber.class);
@Override
public boolean recoveryHandler(String payLoad, Map<String, String> headers) {
//alternate path
return true;
}
@Override public void handleError(SubscriberException ex) { }
@Override public boolean subscribe(String payLoad, Map<String,String> headers) {
LOGGER.info(payLoad);
//Your processing here
return true;
}
}
Add the annotation @EnableJakasuConsumer
to add the Consumer SDK to your project in the SpringBoot Application main class
@SpringBootApplication
@EnableJakasuConsumer
public class JakasuSubscriberApp {
public static void main(String[] args) {
new SpringApplicationBuilder()
.environment(new StandardEncryptableEnvironment())
.sources(JakasuSubscriberApp.class).run(args);
}
}
All Kafka related generic properties like broker, topic, and groupid can be configured as follows:
Minimum configuration needed (example):
jakasu:
subs:
example1:
classname: com.americanexpress.jakasu.subscriber.examples.subscriber.TestSubscriber
topic:
name: "Input-Topic"
groupid: "Consumer-Group"
bootstrap-servers: yourhost:port
Multiple topics and custom configuration (example):
jakasu:
subs:
example1:
classname: com.americanexpress.jakasu.examples.subscriber.TestSubscriber
topic:
name: "Input-Topic"
groupid: "Consumer-Group"
bootstrap-servers: yourhost:port
ex2: #this label can be changed to give a more descriptive name to each
classname: com.americanexpress.jakasu.examples.subscriber.ExampleSubscriber #can be the same class or different
groupid: "Consumer-Group-2"
bootstrap-servers: yourhost:port
topic:
name: "Input-Topic-2" #can be the same topic or different
# OPTIONAL CONFIG:
session-timeout: 30000
request-timeout: 40000
max-poll-records: 300
retry: 3
backoff-period: 50
reset-config: "earliest"
filter:
keys: source-type,event-type
values: (Source1,Event1),(Source2,Event2)
ex3:
classname: com.americanexpress.jakasu.examples.subscriber.OtherSubscriber
groupid: "Consumer-Group-3"
bootstrap-servers: yourhost:port
topic:
name: "Input-Topic-3"
filter:
custom-classname: com.americanexpress.jakasu.examples.subscriber.CustomFilter
Required config values:
- classname: name of the class which implements the Subscriber interface in your project. If you are subscribing to multiple topics, use multiple subscriber classes if messages should be processed differently for each topic. Otherwise, you can choose the same subscriber class for your topics, and a unique instance will be created for each.
- topic: name: name of the kafka topic to consume from
- groupid: a consumer groupid which is unique to your application.
- bootstrap-servers: location for connecting to the kafka cluster.
Optional config values:
- topic: session-timeout: if the broker doesn't hear from the consumer in this amount of ms, it considers the consumer dead and rebalances the cluster.
- default: 10000
- topic: request-timeout: amount of ms the consumer will wait for a response to a request before retrying.
- default: 30000
- topic: max-poll-records: max number of records returned in each poll
- default: 500
- topic: retry: number of times to retry a request if there is no response
- default: 2147483647
- topic: backoff-period: amount of ms to wait before attempting to retry a failed request
- default: 100
- topic: reset-config: which offset should a new consumer start from. "latest" means the consumer reads record from the queue which arrive after its creation. "earliest" means the consumer will read all records which already exist in the queue before its creation. If you do not have publishing permission for your topic, you can test your project by setting reset-config to "earliest" and renaming your consumer group for each test run to reconsume existing messages. If you are able to publish to your topic, consider using the JaKaPu SDK for easy publishing.
- default: latest
- filter: keys and values for which messages to extract from the input stream. Keys should be listed as comma separated strings and values should be listed as comma separated tuples.
- keys: provide as many keys for filtering as needed, with each key separated by a comma.
- values: provide the value to each of the specified keys above.
- default: none, processes all messages Alternatively, specify your own filter strategy by implementing the Filter interface and providing the name of the implementing class.
-
Add the annotation
@EnableJakasuStreams
to add the KStreams SDK to your project in the SpringBoot Application main class.@EnableJakasuStreams public class JakasuStreamsApp { public static void main(String[] args) { new SpringApplicationBuilder() .environment(new StandardEncryptableEnvironment()) .sources(JakasuStreamsApp.class).run(args); } }
-
Create one or more processors to include all your business logic and extend the
StreamProcessor
class.@Component public class ProcessorImpl extends StreamProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ProcessorImpl.class); @Override public String process(String payLoad, Map<String,String> headers) { LOGGER.info("Streams processor received {}", payLoad); return "mod by streams test" + payLoad; } @Override public String handleError(Exception ex, String payLoad, Map<String,String> headers) { LOGGER.error("Error processing message {}", payLoad); return null; } }
- process(String payload, Map<String,String> headers) how to process a message
- Return the transformed payload
- handleError(Exception ex, String payLoad, Map<String,String> headers) is triggered when an exception occurs while processing
- Return null or the result of a payload from an alternate processing path.
- Optional: transformHeaders(Headers header) if you want to modify the headers
- No return value. Just modify the Header object
Set the configuration in your application.yml
files to consume from one or more topics. You should have one for each environment.
Minimum configuration needed (example):
jakasu:
subs:
example1:
topic:
name: "Input-Topic"
groupid: "Consumer-Group"
bootstrap-servers: yourhost:port
streams:
enable: true
processor: com.americanexpress.jakasu.streams.examples.ProcessorImpl
output-topic-name: "Output-Topic"
Multiple topics and custom config (example):
jakasu:
subs:
example1:
topic:
name: "Input-Topic"
groupid: "Consumer-Group"
bootstrap-servers: yourhost:port
streams:
enable: true
processor: com.americanexpress.jakasu.streams.examples.ProcessorImpl
output-topic-name: "Output-Topic"
ex2:
topic:
name: "Input-Topic-2" #can be the same topic or different
session-timeout: 30000
request-timeout: 30000
retry: 3
backoff-period: 50
max-poll-records: 200
reset-config: "earliest"
groupid: "Consumer-Group-2"
bootstrap-servers: yourhost:port
streams:
enable: true
processor: com.americanexpress.jakasu.streams.examples.ProcessorImpl #can be the same class or different
output-topic-name: "Output-Topic-2" #can be the same topic or different
# OPTIONAL config:
thread-count: 1
poll-ms: 150
process-guarantee: exactly_once
topology-optimize: all
buffer-records-per-partition: 500
max-bytes-buffered: 1000000
dedup:
enable: true
id-header: "source-uniqueid"
window-size: 7
replication: 1
flatmap:
enable: true
id-header: "source-uniqueid"
filter:
keys: source-type,event-type
values: (Source1,Event1),(Source2,Event2)
ex3:
topic:
name: "Input-Topic-3"
groupid: "Consumer-Group-3"
bootstrap-servers: yourhost:port
streams:
enable: true
processor: com.americanexpress.jakasu.streams.examples.ProcessorImpl
output-topic-name: "Output-Topic-3"
dedup:
enable: true
id-header: "source-uniqueid"
filter:
custom-classname: com.americanexpress.jakasu.examples.subscriber.CustomFilter
Required config values:
- topic: name: name of the Kafka topic to consume from
- groupid: a consumer group id unique to your application.
- bootstrap-servers: location for connecting to the Kafka cluster.
- streams: processor: name of the class extends StreamProcessor in your project. You can chain multiple processors together by listing them comma separated in the order in which they should execute. If you are subscribing to multiple topics, use a different processor class for each if messages should be processed differently for each topic. Otherwise, you can choose the same processor class for your topics, and a unique instance will be created for each.
- streams: enable: must be set as "true" to use KStreams
- streams: output-topic-name: the Kafka topic where the processed message should be published to. You must be authorized as a publisher for this topic.
Optional config values:
- thread-count: The number of threads per topic to execute stream processing.
- default: 1
- poll-ms: The amount of time in milliseconds to block waiting for input while polling for records.
- default: 100
- process-guarantee: "at_least_once" or "exactly_once". Set to "exactly_once" to guarantee that each record is processed once and only once, even if some failures are encountered in the middle of processing.
- default: "at_least_once"
- topology-optimize: "all" or "none". "all" reduces the footprint Kafka Streams creates especially with repartition.
- default: "none"
- buffer-records-per-partition: The maximum number of records to buffer per partition.
- default: 1000
- max-bytes-buffered: Maximum number of memory bytes to be used for record caches across all threads.
- default: 10485760
- topic: session-timeout: if the broker doesn't hear from the consumer in this amount of ms, it considers the consumer dead and rebalances the cluster.
- default: 10000
- topic: request-timeout: amount of ms the consumer will wait for a response to a request before retrying.
- default: 30000
- topic: retry: Number of times to retry a request if there is no response
- default: 2147483647
- topic: backoff-period: Amount of ms to wait before attempting to retry a failed request
- default: 100
- topic: max-poll-records: max number of records returned in each poll
- default: 500
- topic: reset-config: which offset should a new consumer start from. "latest" means the consumer reads record from the queue which arrive after its creation. "earliest" means the consumer will read all records which already exist in the queue before its creation. If you do not have publishing permission for your topic, you can test your project by setting reset-config to "earliest" and renaming your consumer group for each test run to reconsume existing messages. If you are able to publish to your topic, consider using the JaKaPu SDK for easy publishing.
- default: latest
- dedup: enable: Set to true if messages which are duplicates of another message which has already been processed should be skipped. Note: An internal topic will be created as a changelog of seen messages. You must be authorized to create topics in your Kafka cluster or have your administrator create the internal topic beforehand.
- default: false
- dedup: id-header: The name of the message header which can be used as a unique identifier of message for deduplication.
- default: No default value, but if not provided, a deduplication processor will not be added to the streams topology
- dedup: window-size: Optionally specify the number of days to retain a message to check for duplicates.
- default: 7 days. That is the default retention window for messages in a Kafka topic
- dedup: replication: The replication factor of internal topics that Kafka Streams creates if using deduplication. This is the number of broker failures which can be tolerated.
- default: 3
- flatmap: enable: Set to true if messages are in a bulk format with multiple events per message and should be split into messages with an individual in each when sent to output topic
- default: false
- flatmap: id-header: The name of the message header which can be used as a unique identifier of message. Each event will be published with this event id, appended with the index of the event in the bulk message
- default: No default value, but if not provided, a flatmap processor will not be added to the streams topology
- filter: Keys and values for which messages to extract from the input stream. Keys should be listed as comma separated strings and values should be listed as comma separated tuples.
- keys: Provide as many keys for filtering as needed, with each key separated by a comma.
- values: Provide the value to each of the specified keys above.
- default: none, processes all messages
All security related properties can be configured as follows:
jakasu:
security:
enabled: true
protocol: SSL
ssl:
protocol: TLSv1.2
keystore:
type: JKS
location: jakasu-subscriber-examples/src/main/cert/appkeystore.jks
password: ENC(encryptedpasswordhere)
truststore:
location: jakasu-subscriber-examples/src/main/cert/kafka-client-yourtruststore.jks
password: ENC(encryptedpasswordhere)
You will need to obtain certificates for your Kafka topic and be authorized as a consumer. Provide the path and password for these certificates here.
It is highly recommended to encrypt your secrets
We welcome Your interest in the American Express Open Source Community on Github. Any Contributor to any Open Source Project managed by the American Express Open Source Community must accept and sign an Agreement indicating agreement to the terms below. Except for the rights granted in this Agreement to American Express and to recipients of software distributed by American Express, You reserve all right, title, and interest, if any, in and to Your Contributions. Please fill out the Agreement.
Any contributions made under this project will be governed by the Apache License 2.0.
This project adheres to the American Express Community Guidelines. By participating, you are expected to honor these guidelines.