DISCLAIMER: This project is for demonstration purposes only. Using the concept in production is highly discouraged. Use at your own risk.
Change to folder cluster
.
Start the containers by running:
docker-compose up -d
Stopping the containers:
docker-compose down
Cleaning up (CAREFUL: THIS WILL DELETE ALL UNUSED VOLUMES):
docker volumes prune
Change to the client
sub folder and initialize by running (might not be necessary as gradlew
is part of the repository and might be up-to-date):
gradle wrapper
Build a shadow jar (which contains all required jars) by running:
./gradlew shadowJar
In the client
sub folder you can use gradle to run the producer, either using the shadow jar:
./gradlew -p producer-7.2.1 runShadow
or directly (relying on gradles runtime classpath):
./gradlew -p producer-7.2.1 run --args='../configuration/dev.properties ../input.txt'
You can also run the shadowJar directly:
java -jar producer-7.2.1/build/libs/producer-7.2.1.jar configuration/dev.properties input.txt
You can also use the same producer code but pacakged with the (almost) latest version of the Kafka library, by using the commands above but replace the folder name producer-7.2.1
with producer-latest
.
Just for testing the setup, you can consume the messages by running (in the client
folder):
kafka-avro-console-consumer --bootstrap-server broker:9092 --consumer.config configuration/dev.properties --topic message --from-beginning
Tool | URL | Note |
---|---|---|
Kafka Broker |
||
Prometheus |
||
Grafana |
Login: "kafka", Password: "kafka" |
|
JMX Exporter for the Kafka Broker |
||
Prometheus Node Exporter |
Note, the docker setup already sets the log level of the request logger to debug. The following explains how to set that log level at runtime.
Show dynamic configs for the broker (broker id is 1
thus we use entity name 1
):
docker compose exec broker kafka-configs --bootstrap-server broker:9092 --describe \
--entity-type broker-loggers --entity-name 1
Set debug level to DEBUG
:
docker compose exec broker kafka-configs --bootstrap-server broker:9092 --alter \
--add-config "kafka.request.logger=DEBUG" \
--entity-type broker-loggers --entity-name 1
Check which JMX metrics are available on the broker (you might need to add broker
with hostname 127.0.0.1
to your /etc/hosts
temporarily):
jconsole broker:10091
The MBean kafka.server:clientSoftwareName=(client-software-name),clientSoftwareVersion=(client-software-version),listener=(listener),networkProcessor=(processor-index),type=(type)
provides information about the used client library and the version.
In Prometheus, have a look at the metric called kafka_server_socketservermetrics_connections
(as configured in the JMX exporter for the broker metrics). Please note that you won’t see any client ID, principal, or similar. You just see the name of the library and the version. Thus, this MBean is not useful for identifying the users of outdated versions of Kafka libraries.
Switch of debug logging:
docker compose exec broker kafka-configs --bootstrap-server broker:9092 --alter \
--delete-config kafka.request.logger \
--entity-type broker-loggers --entity-name 1
You can follow the logs of the docker container running the Kafka broker by running this command in the cluster
subfolder (exit with Ctrl-c
):
docker-compose logs -f kafka
Whenever a client connects, lines similar to the following will appear in the docker log of the kafka broker.
broker | [2023-05-31 17:24:11,538] INFO Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":18,"requestApiVersion":3,"correlationId":0,"clientId":"producer-7.2.1","requestApiKeyName":"API_VERSIONS"},"request":{"clientSoftwareName":"apache-kafka-java","clientSoftwareVersion":"7.2.1-ccs"},"response":{"errorCode":0,"apiKeys":[{"apiKey":0,"minVersion":0,"maxVersion":9},{"apiKey":1,"minVersion":0,"maxVersion":13},{"apiKey":2,"minVersion":0,"maxVersion":7},{"apiKey":3,"minVersion":0,"maxVersion":12},{"apiKey":4,"minVersion":0,"maxVersion":6},{"apiKey":5,"minVersion":0,"maxVersion":3},{"apiKey":6,"minVersion":0,"maxVersion":7},{"apiKey":7,"minVersion":0,"maxVersion":3},{"apiKey":8,"minVersion":0,"maxVersion":8},{"apiKey":9,"minVersion":0,"maxVersion":8},{"apiKey":10,"minVersion":0,"maxVersion":4},{"apiKey":11,"minVersion":0,"maxVersion":9},{"apiKey":12,"minVersion":0,"maxVersion":4},{"apiKey":13,"minVersion":0,"maxVersion":5},{"apiKey":14,"minVersion":0,"maxVersion":5},{"apiKey":15,"minVersion":0,"maxVersion":5},{"apiKey":16,"minVersion":0,"maxVersion":4},{"apiKey":17,"minVersion":0,"maxVersion":1},{"apiKey":18,"minVersion":0,"maxVersion":3},{"apiKey":19,"minVersion":0,"maxVersion":7},{"apiKey":20,"minVersion":0,"maxVersion":6},{"apiKey":21,"minVersion":0,"maxVersion":2},{"apiKey":22,"minVersion":0,"maxVersion":4},{"apiKey":23,"minVersion":0,"maxVersion":4},{"apiKey":24,"minVersion":0,"maxVersion":3},{"apiKey":25,"minVersion":0,"maxVersion":3},{"apiKey":26,"minVersion":0,"maxVersion":3},{"apiKey":27,"minVersion":0,"maxVersion":1},{"apiKey":28,"minVersion":0,"maxVersion":3},{"apiKey":29,"minVersion":0,"maxVersion":2},{"apiKey":30,"minVersion":0,"maxVersion":2},{"apiKey":31,"minVersion":0,"maxVersion":2},{"apiKey":32,"minVersion":0,"maxVersion":4},{"apiKey":33,"minVersion":0,"maxVersion":2},{"apiKey":34,"minVersion":0,"maxVersion":2},{"apiKey":35,"minVersion":0,"maxVersion":3},{"apiKey":36,"minVersion":0,"maxVersion":2},{"apiKey":37,"minVersion":0,"maxVersion":3},{"apiKey":38,"minVersion":0,"maxVersion":2},{"apiKey":39,"minVersion":0,"maxVersion":2},{"apiKey":40,"minVersion":0,"maxVersion":2},{"apiKey":41,"minVersion":0,"maxVersion":2},{"apiKey":42,"minVersion":0,"maxVersion":2},{"apiKey":43,"minVersion":0,"maxVersion":2},{"apiKey":44,"minVersion":0,"maxVersion":1},{"apiKey":45,"minVersion":0,"maxVersion":0},{"apiKey":46,"minVersion":0,"maxVersion":0},{"apiKey":47,"minVersion":0,"maxVersion":0},{"apiKey":48,"minVersion":0,"maxVersion":1},{"apiKey":49,"minVersion":0,"maxVersion":1},{"apiKey":50,"minVersion":0,"maxVersion":0},{"apiKey":51,"minVersion":0,"maxVersion":0},{"apiKey":56,"minVersion":0,"maxVersion":1},{"apiKey":57,"minVersion":0,"maxVersion":1},{"apiKey":60,"minVersion":0,"maxVersion":0},{"apiKey":61,"minVersion":0,"maxVersion":0},{"apiKey":65,"minVersion":0,"maxVersion":0},{"apiKey":66,"minVersion":0,"maxVersion":0},{"apiKey":67,"minVersion":0,"maxVersion":0},{"apiKey":10000,"minVersion":0,"maxVersion":3},{"apiKey":10001,"minVersion":0,"maxVersion":1},{"apiKey":10002,"minVersion":0,"maxVersion":3},{"apiKey":10003,"minVersion":0,"maxVersion":3},{"apiKey":10004,"minVersion":0,"maxVersion":1},{"apiKey":10005,"minVersion":0,"maxVersion":0},{"apiKey":10006,"minVersion":0,"maxVersion":3},{"apiKey":10007,"minVersion":0,"maxVersion":2},{"apiKey":10008,"minVersion":0,"maxVersion":1},{"apiKey":10009,"minVersion":0,"maxVersion":2},{"apiKey":10010,"minVersion":0,"maxVersion":0},{"apiKey":10011,"minVersion":0,"maxVersion":1},{"apiKey":10012,"minVersion":0,"maxVersion":0},{"apiKey":10013,"minVersion":0,"maxVersion":1},{"apiKey":10014,"minVersion":0,"maxVersion":1},{"apiKey":10015,"minVersion":0,"maxVersion":0},{"apiKey":10016,"minVersion":0,"maxVersion":0},{"apiKey":10017,"minVersion":0,"maxVersion":0},{"apiKey":10018,"minVersion":0,"maxVersion":0},{"apiKey":10019,"minVersion":0,"maxVersion":0},{"apiKey":10020,"minVersion":0,"maxVersion":0},{"apiKey":10021,"minVersion":0,"maxVersion":0},{"apiKey":10022,"minVersion":0,"maxVersion":1}],"throttleTimeMs":0,"finalizedFeaturesEpoch":0},"connection":"172.18.0.9:10092-172.18.0.1:55030-3","clientAddress":"172.18.0.1","totalTimeMs":0.28,"requestQueueTimeMs":0.096,"localTimeMs":0.008,"remoteTimeMs":0.092,"throttleTimeMs":0,"responseQueueTimeMs":0.022,"sendTimeMs":0.06,"securityProtocol":"PLAINTEXT","principal":{"class":"KafkaPrincipal","type":"User","name":"ANONYMOUS","tokenAuthenticated":false},"listener":"OUT_DOCKER","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"},"isDisconnectedClient":false} (kafka.request.logger)
The client ID and the client software version can easily be extracted from those lines.
Note that most of the other debug messages are filtered by the log4j configuration file, that is generated when the docker container running the Kafka broker starts using the template found in cluster/docker/kafka/log4j.properties.template
.
The result of rendering that template is the following:
log4j.rootLogger=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
# Allow everthing with log level FATAL, ERROR and WARN
log4j.appender.stdout.filter.01=org.apache.log4j.varia.LevelMatchFilter
log4j.appender.stdout.filter.01.level=FATAL
log4j.appender.stdout.filter.01.AcceptOnMatch=true
log4j.appender.stdout.filter.02=org.apache.log4j.varia.LevelMatchFilter
log4j.appender.stdout.filter.02.level=ERROR
log4j.appender.stdout.filter.02.AcceptOnMatch=true
log4j.appender.stdout.filter.03=org.apache.log4j.varia.LevelMatchFilter
log4j.appender.stdout.filter.03.level=WARN
log4j.appender.stdout.filter.03.AcceptOnMatch=true
# Allow certain request log debug information: We just want to see initial version information
log4j.appender.stdout.filter.04=org.apache.log4j.varia.StringMatchFilter
log4j.appender.stdout.filter.04.StringToMatch="requestApiKeyName":"API_VERSIONS"
log4j.appender.stdout.filter.04.AcceptOnMatch=true
# Filter all other unwanted request log debug information
log4j.appender.stdout.filter.05=org.apache.log4j.varia.StringMatchFilter
log4j.appender.stdout.filter.05.StringToMatch=Completed request
log4j.appender.stdout.filter.05.AcceptOnMatch=false
# Filter internal traffic
log4j.appender.stdout.filter.06=org.apache.log4j.varia.StringMatchFilter
log4j.appender.stdout.filter.06.StringToMatch="listener":"IN_DOCKER"
log4j.appender.stdout.filter.06.AcceptOnMatch=false
log4j.appender.stdout.filter.07=org.apache.log4j.varia.StringMatchFilter
log4j.appender.stdout.filter.07.StringToMatch="listener":"INTERNAL"
log4j.appender.stdout.filter.07.AcceptOnMatch=false
INFO Completed request
log4j.logger.kafka=INFO
log4j.logger.kafka.network.RequestChannel$=WARN
log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG
log4j.logger.kafka.request.logger=DEBUG
log4j.logger.kafka.controller=TRACE
log4j.logger.kafka.log.LogCleaner=INFO
log4j.logger.state.change.logger=TRACE
log4j.logger.kafka.authorizer.logger=WARN
The magic is to use the filter
to allow all critical events first (FATAL
, ERROR
, WARN
). Then all lines where the API_Version is requested are allowed. Finally, all other debug messages are filtered.
The docker-based Kafka cluster setup can to be updated manually. Potentially, updating the version of CP in the cluster/.env
file is sufficient.
The Java libraries can be updated manually, but with support by the automatic dependency check provided by the Gradle Version Plugin. Just run the following to identify outdated libraries:
./gradlew dependencyUpdates
Gradle plugins versions are managed centrally in gradle.properties
so they need to be updated there.
Almost all Java library versions are managed in the gradle config files in buildSrc/src/main/kotlin
. Only the kafka client version is managed directly in each subprojet in the build.gradle.kts
for better flexibility. For the producer-7.2.1
example, all Kafka-specific libraries should be kept as-is, for obvious reasons.
Until KIP-714 is supported in Confluent Platform, the only way to map client software names and versions to the corresponding clientIds or principals (the latter has not been demonstrated in this demo) is by enabling the DEBUG log level for the Kafka request logger and monitor the requests.