- Table of Contents
- KafkaEventStreaming
Created by gh-md-toc
Source code repository for the 2nd Edition of Kafka Streams in Action
The source code for Kafka Streams in Action 2nd Edition has a few prerequisites you'll need to make sure you have installed to get everything working smoothly.
- Java, the project uses version 17 from Temurin 17.
- Gradle version 7.2 Although you don't need to install Gradle if you don't have it since
the included Gradle "wrapper" script will install it if needed. Use
./gradlew
for mac OS/'nix andgradlew.bat
on Windows. - Docker Desktop version 4.3.2
- Git version 2.31.1
I've tried to make everything platform neutral, but just for context here's the environment that everything was developed and tested:
- OS: macOS Monterey 12.1
- Shell:
zsh
- IDE: IntelliJ IDEA
All other dependencies should be installed via the build.gradle
file.
Before you get started, you'll need to generate the Java code from the Avro, Protobuf and JSON Schema schemas so the project will compile.
Run ./gradlew build
or if you are on Windows gradlew.bat build
.
Note that if you don't have Gradle installed, running the gradlew
script installs Gradle the first
time you run it.
For the most part, using docker with the example code is a seamless operation. Should you encounter any issues the first
line of debugging should be this command docker logs <image name>
where the image name is either zookeeper
, broker
,
or schema-registry
. Also, all the examples in this repo use Log4J
and write to the
event_streaming_dev.log
file in the logs
directory at the root of the project.
In the root of the project there is a file project-commands.sh
that contains some functions
wrapping some commands you'll find yourself executing during the course of reading this book.
To expose the commands cd
into the base directory of the project and run source project-commands.sh
, then
you'll have shortcuts for some command-line commands you'll encounter.
You'll notice the project contains four modules
- streams
- sr-backward
- sr-forward
- sr-full
The streams
module is the main module and contains all the source code for the book. You'll spend the majority of
your time working with the code here.
Note that when running any commands other than clean
or build
it's best important
to always prefix the command with the module name which is the target of the command.
In the appendix-B of the book, there is a tutorial walking you through migrating schemas and the permitted changes for each compatibility mode. Each module contains an updated schema and updated producer and consumer clients to support the schema changes. I've named the modules to match compatibility mode it supports.
The appendix walks you through a series of steps including the schema changes and explains what happens and why at each step, so I won't go into that level of detail here. So please consult appendix-B for the full explanation.
At a high-level, you'll work through a series of changes and compatibility modes proceeding in this order of compatibility
backward
, forward
and finally full
.
For each step of the tutorial you'll take the following steps
- Set the compatibility mode: run the
<module name>:configSubjectsTask
to set the compatibility mode - Test the updated schema: execute the
<module name>:testSchemasTask
to test the schema changes are compatible - Register the updated schema: run the
<module name>:registerSchemasTask
and register the new schema
Then after these steps of updating, testing and registering the schema you'll run a couple of commands for the producer and consumer clients in a specific order and observe the output.
It's very important to follow the specific order of the commands as well as execute them exactly as shown.
Note that there is a high level of overlap between the submodule code and build.gradle
files. This is intentional!!
I wanted to isolate each module in the spirit of independent changes. The purpose of tutorial is to highlight
how you handle schema changes within different compatibility modes. Not how you set up sub-modules within a main gradle
project
Several tests (both unit and integration) use testcontainers. In order to
reduce the overhead of the tests use singleton containers. You get a singleton container by extending one of three
abstract classes in the testcontainers
package. The abstract classes all create and start a container in a static block when first
loaded, and the remaining tests reuse the same container.
As result, running the tests takes some time, with some taking over a minute depending on the point the test is emphasizing.
To mitigate the time it take to run the tests, the longer running tests are tagged - Tag("long")
and only run with the
Gradle command ./gradlew longTests
. Otherwise ./gradlew build
and ./gradlew test
only run the "shorter" tests.
BaseKafkaContainerTest
- Base Kafka containerBaseProxyInterceptingKafkaContainerTest
- Kafka container which includes a Toxiproxy container for simulating network issuesBaseTransactionalKafkaContainerTest
- This Kafka container is configured for transactional API tests and sets correct Kafka configs for using transactions in a single broker
For this chapter there's only some commands you can run
from the console. See project-commands.sh
, as some commands have functions wrapping them.
The code in chapter 3 is used for demonstrating how to interact with Schema Registry and not with an emphasis on the producer and consumer code.
I've broken up the examples across two main packages: bbejeck.chapter_3.producer
and bbejeck.chapter_3.consumer
Within those two, there are additional packages of avro
, proto
, and json
containing
example code used to drive serializing and deserializing examples using the
serializers for the given format indicated by the package name.
It's a good idea for me to describe the contents of the directories and the function of each class:
-
bbejeck.chapter_3.producer
- avro
AvroProducer
Initial producer example for working with Avro schemasAvroReferenceCollegeProducer
Example producer for working with schema references in AvroAvroReferenceCompanyProducer
Example producer for working with schema references in Avro
- json
JsonSchemaProducer
Initial producer example for working with JsonSchema schemasJsonSchemaReferenceCollegeProducer
Example producer for working with schema references in JsonSchemaJsonScheamReferenceCompanyProducer
Example producer for working with schema references in JsonSchema
- proto
ProtoProducer
Initial producer example for working with Protobuf schemasProtoReferenceCollegeProducer
Example producer for working with schema references in ProtobufProtoReferenceCompanyProducer
Example producer for working with schema references in Protobuf
- avro
-
bbejeck.chapter_3.consumer
- avro
AvroConsumer
Initial consumer example for working with Avro schemasAvroReferenceCollegeConsumer
Example consumer for working with schema references in AvroAvroReferenceCompanyConsumer
Example consumer for working with schema references in Avro
- json
JsonSchemaConsumer
Initial consumer example for working with JsonSchema schemasJsonSchemaReferenceCollegeConsumer
Example consumer for working with schema references in JsonSchemaJsonScheamReferenceCompanyConsumer
Example consumer for working with schema references in JsonSchema
- proto
ProtoConsumer
Initial consumer example for working with Protobuf schemasProtoReferenceCollegeConsumer
Example consumer for working with schema references in ProtobufProtoReferenceCompanyConsumer
Example consumer for working with schema references in Protobuf
- avro
-
bbejeck.chapter_3
AvroReflectionProduceConsumeExample
A simple example using the AvroReflection serializer and deserializer. I will update chapter 3 in another MEAP release to cover using this part of the Avro API
NOTE: Before attempting to work with any of the examples, make sure you have a Kafka Broker and Schema Registry running. To do this
simply execute the following command docker-compose up -d
. The -d
flag is for running docker in "detached" mode
which is essentially the same as running any linux command with a &
appended to it.
For the examples nested under the producer
or consumer
packages, you need run them in steps:
- Run the producer - the producer examples send a few records then shutdown
- Run the consumer - the consumer starts up and displays some information on the console then it shuts down
after two consecutive
poll
calls without retrieving any records it shuts down.
In this MEAP release I've added tests that your can run instead of the producer-consumer steps. In the
src/test/java/bbejeck/chapter_3
package there are three tests for the Avro, Protobuf and JsonSchema
producer-consumer interaction with SchemaRegistry. As time goes on I'll add tests for all examples in
chapter 3. You can still run the examples as stand-alone
programs if you wish, but should you choose to experiment you'll be able to run tests to ensure everything still works as
expected.
To interact with Schema Registry, we're using the Gradle Schema Registry plugin
This plugin make working with Schema Registry very simple. The configuration for the different
commands are located within the schemaRegistry
block in the build.gradle
file. The book text
describes the different commands, but here's a cheat-sheet
- Set subject compatibility
./gradlew <module name>:configSubjectsTask
- Download schemas
./gradlew <module name>:downloadSchemasTask
- Register schemas
./gradlew <module name>:registerSchemasTask
- Test compatibility for a proposed update
./gradlew <module name>:testSchemasTask
Where the <module name>
is one of streams
, sr-backward
, sr-forward
, or sr-full
. It's important
to specify the module in any of the commands, otherwise Gradle will execute the command across all
modules, and the different Schema Registry modules will clash resulting a failure.
Also, if you are running on Windows use gradlew.bat
instead.
Chapter 4 is all about working with the KafkaProducer
and KafkaConsumer
so all the example code are small
applications demonstrating this. I've also tried something new for this chapter and that is use an integration test
to demonstrate the applications running. I did this for two reasons 1) For you the reader to be able to experiment
and see that the applications still work as intended and 2) IMHO having a test makes the example more meaningful than
an application that simply prints results to standard out.
However, it's up to you the reader to let me know that having an integration test is a better approach than a
In chapter 4, I've structured the code to match up with the main themes from the chapter. So there are packages that
contain a KafkaProducer
and KafkaConsumer
for supporting what the text is trying to teach. Here's a description of
what's found in each package:
-
bbejeck.chapter_4.multi_event
Although we covered producing multiple events to a single topic in chapter 3, that coverage only dealt with the Schema Registry. In chapter four we covered what needs to be done in you producer and consumer clients for multi-event topics.
-
avro
Producer and consumer client applications for working with Avro multiple event topics. These classes run in the
MultiEventAvroProduceConsumeTest
integration test. -
proto
Producer and consumer client applications for working with Protobuf multiple event topics. These classes run in the
MultiEventProtoProduceConsumeTest
integration test.
-
-
bbejeck.chapter_4.pipelining
This code in this package corresponds to the async committing portion of the text. The
PipeliningProducerClient
does not do anything special beyond producing the records. ThePipeliningConsumerClient
consumes the records and hands them off to theConcurrentRecordProcessor
which processes the records. TheConcurrentRecordProcessor
also stores the offsets of records successfully processed in aConcurrentLinkedDeque
and retrieves them when queried by thePipeliningConsumerClient
for committing. TheProducePipeliningConsumeApplication
uses aDataGenerator
and runs indefinitely until you shut it down. with aCTRL+C
command.Please note all this example code is meant for demonstration purposes of one way of handling long processing times and committing offset strategies
-
bbejeck.chapter_4.sales
This is the example for the introduction for working with a
KafkaProducer
andKafkaConsumer
. It follows the narrative of an application for handing sales data. TheSalesProduceConsumeApplication
runs the producer and consumer in separate threads for the convenience of having the producer and consumer clients run in single application, but this is not meant to reflect what would be done in a production setting. TheSalesProduceConsumeApplication
uses a data generatorDataGenerator
and runs indefinitely until you shut it down with aCTRL +C
command.
In chapter 4 I started a new approach by using integration tests for some the sample applications. In some examples I felt the producer and consumer code were enough to teach the main point. In other examples I think that instead of having you read log output, it might be better to structure the example as an integration test and assert the main points of example.
Some tests in the chapter_4 package are long-running, and have a the Tag("long")
so they won't run in a
a ./gradlew test
command but only individually from the IDE or with a ./gradlew longTests
command. However,
the long-running tests are meant as a teaching tool.
-
AdminClientTest
The
AdminClientTest
is a basic demo of working with topics. As the book progresses I'll add more test cases. -
IdempotentProducerTest
The
IdempotentProducerTest
is a parameterized test demonstrating how the idempotent producer guarantees exactly-once delivery (per producer session) by simulating a network partition using Toxiproxy module. The network partition forces theKafkaProducer
to retry sending records. The test asserts that in regular mode there are duplicate records and in idempotent mode even with the retries theKafkaProducer
only delivers records once. This is a long-running test so it's meant for running individually for observing idempotent behavior. -
TransactionalConsumeTransformProduceTest
The
TransactionalConsumeTransformProduceTest
is a demonstration of the consume-transform-produce process using the Kafka transactional API. This is also a test taggedTag("long)
and is a teaching tool meant to run individually from the IDE. -
TransactionalProducerConsumerTest
The
TransactionalProducerConsumerTest
demonstrates two use cases. The first is the simple path of producing records in a transaction and the consumer reading them inread-committed
mode. The second use case demonstrates how the consumer when inread-committed
mode only consumes records that have successfully committed, and when in regular mode it will consume all records. This example is accomplished in thetestConsumeOnlyOnceAfterAbortReadCommitted
which is a parameterized test with a simulated error resulting aborting a transaction. -
serializers
The tests in the
serializers
package are simple tests of round-trip serialize-deserialize for stand-alone serializers and deserializers for JSON and Protobuf
Chapter 6 starts the coverage of Kafka Streams. There are several sample applications demonstrating the core principals covered in chapter 6. Currently, there aren't any tests, but I'll get those done soon.
FOR ALL THE EXAMPLES YOU"LL NEED TO MAKE SURE TO RUN docker-compose up -d
BEFOREHAND SO YOU"LL HAVE A LIVE BROKER FOR KAFKA STREAMS TO CONNECT TO
Now let's take a brief tour of the examples in chapter 6.
- bbejeck.chapter_6.client_supplier package
This package contains a concrete implementation of a KafkaClientSupplier
that you'll use to provide a Kafka Streams
application you're own KafkaProducer
and KafkaConsumer
instances. The CustomKafkaStreamsClientSupplier
is a basic example
and simply logs a statement for each method executed. I provided the KafkaStreamsCustomClientsApp
to demonstrate how you'd use the client supplier when you want
to provide the Kafka Streams application custom clients.
- KafkaStreamsYellingApp
The first application you'll build is the KafkaStreamsYellingApp
which transform incoming random text into all upper-case characters, effectively
yelling at everyone. There are two additional variants of the yelling app demonstrating additional Kafka Streams capabilities.
The KafkaStreamsYellingAppWithPeek
shows how to use the peek
operation to print results at arbitrary points in the application. Note
that printing is but one use of peek
.
The KafkaStreamsYellingNamedProcessorsApp
demonstrates how you can provide meaningful names your processors
which results in a topology that is easier to understand when printing the physical plan of the topology.
- ZMartKafkaStreamsApp
The ZMartKafkaStreamsApp
uses a fictional retail example to demonstrate some additional features, namely mapValues
,
flatMap
and the KStream#print
operators. This application also shows how to use Schema Registry with a Kafka Streams application
by setting ZmartKafkaStreamsApp.useSchemaRegistry
field to true before starting the application on line 142.
Other variants
of the application include ZMartKafkaStreamsFilteringBranchingApp
which shows adding filter
and splitting a stream with
split
to perform different operations on each branch.
Then the ZMartKafkaStreamsDynamicRoutingApp
shows how to route records
to different topics at runtime using a TopicNameExtractor
interface. There are two concrete implementations the PurchaseTopicNameExtractor
which gets the topic name from the department
field of a record in the example. The HeadersTopicNameExtractor
demonstrates the same
functionality but extracts the topic name to use from a Header
provided with record.
- SensorBranchingMergingApp
The SensorBranchingMergingApp
uses branching again but in this example you'll split the stream into branches, add
a mapping operation to one of branches, but you'll now use KStreams#merge
merge the branches with other KStream
instances in the application.
Coming soon!
Chapter 7 covers stateful operations in Kafka Streams. There are several sample applications demonstrating using reduce, aggregation and joins. Chapter 7 also covered using state stores. Several of the example applications have a version using the default, persistent stores and another version showing how to use an in-memory store.
FOR ALL THE EXAMPLES YOU"LL NEED TO MAKE SURE TO RUN docker-compose up -d
BEFOREHAND SO YOU"LL HAVE A LIVE BROKER FOR KAFKA STREAMS TO CONNECT TO
Here's a brief description of the example code in chapter 7. Instead of detailing each class, I've broken the description down by category covered in the book.
- bbejeck.chapter_7.aggregator package
Contains an implementation of an Aggregator
interface. The StreamsStockTransactionAggregations
example uses this Aggregator
.
- bbejeck.chapter_7.joiner package
Contains an implementation of a Joiner
interface used in the KafkaStreamsJoinsApp
- Counting
For demonstrating the use of count()
there is the StreamsCountingApplication
The StreamsCountingInMemoryApplication
uses the same functionality
but uses an in-memory state store.
- Reduce
For demonstrating the use of reduce
there is the StreamsPokerGameReducer
application and the StreamsPokerGameInMemoryStoreReducer
using an in-memory store
- Aggregation
To show how to perform a more complex Aggregation
there is the StreamsStockTransactionAggregations
example.
- Joins
To demonstrate the use of joins there is the KafkaStreamsJoinsApp
. Over time, I'll add additional examples showing the different join types.
- Repartitioning
To demonstrate the various ways to repartition the data there a several examples:
SteamsChangeKeyThenReuseRepartition
- shows how reusing aKStream
after changing the key can lead to multiple, redundant repartition nodesProactiveStreamsRepartition
- demonstrates how to manually perform a repartition using therepartition
operationOptimizingStreamsRepartition
- shows how KafkaStreams will remove redundant repartition nodes when configured to do soRepartitionForThroughput
- This application is an example of how to use therepartition
operation to repartition to a higher number partitions to achieve a higher number of tasks for higher throughput of the KafkaStreams application.
There are some unit tests for the chapter 7 example applications. There's not a test for every example at the moment, but I plan to have a test for every class soon.