Skip to content
This repository was archived by the owner on Jul 31, 2025. It is now read-only.

NovatecConsulting/tc-paia-kafka-examples

Repository files navigation

Kafka Examples

This example shows the implementation of a pseudonymization use case based on Kafka Streams using state stores. This makes it possible to realize an appropriate mapping without the use of additional databases.

The basic idea is that the application consumes events from topic action_v1_source.

Source events from action_v1_source
{"eventTime":1627381572560115,"accountId":"accountA","accountName":"Anja Abele","action":"accelerate"}
{"eventTime":1627381566350235,"accountId":"accountB","accountName":"Berthold Brecht","action":"start"}
{"eventTime":1627381582593780,"accountId":"accountA","accountName":"Anja Abele","action":"forward"}

This events contain personal data which is pseudonymized by this example Kafka Streams application. To ensure that the same account is always replaced with the same pseudonymized account, a state store is used to store the generated mapping.

Pseudonymized events from action_v1_pseudonymized
{"eventTime":1627381572560115,"accountId":"8cce533d-b14a-48c7-ae55-b4ab2b07cf10","accountName":"lwUhAZEar","action":"accelerate"}
{"eventTime":1627381566350235,"accountId":"1cf03214-2825-4f4c-93ac-7d3e742921eb","accountName":"FSnoZ","action":"start"}
{"eventTime":1627381582593780,"accountId":"8cce533d-b14a-48c7-ae55-b4ab2b07cf10","accountName":"lwUhAZEar","action":"forward"}

Quickstart

Start Kafka Broker and Schema Registry
docker-compose up -d

Kafka

localhost:9092

Schema Registry

http://localhost:8081

Build examples
./gradlew shadowJar
Start the pseudonymizer Kafka Streams application
java -cp build/libs/tc-paia-kafka-examples-0.1-SNAPSHOT-all.jar de.novatec.tc.pseudonymize.specific.PseudonymizeApp
Start the driver which creates random ingest data
java -cp build/libs/tc-paia-kafka-examples-0.1-SNAPSHOT-all.jar de.novatec.tc.pseudonymize.specific.PseudonymizeDriver
Now open three terminals and exec into the Schema Registry Docker container
docker exec -it kafka-examples_schema-registry_1 bash
Listen to the input topic
kafka-avro-console-consumer \
  --bootstrap-server localhost:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --property print.key=true \
  --key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
  --from-beginning \
  --topic action_v1_source
Example output
accountB	{"eventId":"db50506e-f179-49de-917c-12400847d02f","eventTime":1627381566350235,"accountId":"accountB","accountName":"Berthold Brecht","action":"start"}
accountB	{"eventId":"a7723568-3a4d-4a1e-a7f9-058f6a9011af","eventTime":1627381568543809,"accountId":"accountB","accountName":"Berthold Brecht","action":"left"}
accountC	{"eventId":"855abc0d-0248-44e2-b93c-437cc93ad4bd","eventTime":1627381570553982,"accountId":"accountC","accountName":"Constantin Claußen","action":"accelerate"}
accountA	{"eventId":"48b61d32-64b4-411f-94c8-4ce078ace8af","eventTime":1627381572560115,"accountId":"accountA","accountName":"Anja Abele","action":"accelerate"}
accountA	{"eventId":"e4de96c5-6bec-47ab-902a-0097eb95d8ea","eventTime":1627381574567298,"accountId":"accountA","accountName":"Anja Abele","action":"retard"}
accountA	{"eventId":"39465183-819f-4749-9ad9-9a76e7ba33af","eventTime":1627381576573062,"accountId":"accountA","accountName":"Anja Abele","action":"start"}
accountB	{"eventId":"78501d9c-4efd-4261-80ca-bf2f09af6228","eventTime":1627381578579144,"accountId":"accountB","accountName":"Berthold Brecht","action":"accelerate"}
accountB	{"eventId":"a3e38f96-8d8d-4052-9e96-af2aeb92a6da","eventTime":1627381580585428,"accountId":"accountB","accountName":"Berthold Brecht","action":"start"}
accountA	{"eventId":"0bf0122f-4c35-4fb4-bbe9-8ca6cf87d780","eventTime":1627381582593780,"accountId":"accountA","accountName":"Anja Abele","action":"forward"}
Listen to the pseudonymized topic
kafka-avro-console-consumer \
  --bootstrap-server localhost:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --property print.key=true \
  --key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
  --from-beginning \
  --topic action_v1_pseudonymized
Example output
1cf03214-2825-4f4c-93ac-7d3e742921eb	{"eventId":"e94181eb-72d0-4be5-b235-1c73aff750e0","eventTime":1627381566350235,"accountId":"1cf03214-2825-4f4c-93ac-7d3e742921eb","accountName":"FSnoZ","action":"start"}
1cf03214-2825-4f4c-93ac-7d3e742921eb	{"eventId":"0db85a57-7abb-4612-9be6-0250b1bd03b3","eventTime":1627381568543809,"accountId":"1cf03214-2825-4f4c-93ac-7d3e742921eb","accountName":"FSnoZ","action":"left"}
5f3ce651-4f07-4670-9bf7-26c18a038a96	{"eventId":"7af5c530-4e65-4014-bde1-f529395b3937","eventTime":1627381570553982,"accountId":"5f3ce651-4f07-4670-9bf7-26c18a038a96","accountName":"dNqCiDRex","action":"accelerate"}
8cce533d-b14a-48c7-ae55-b4ab2b07cf10	{"eventId":"592cb074-d74d-404d-8060-564fc397073b","eventTime":1627381572560115,"accountId":"8cce533d-b14a-48c7-ae55-b4ab2b07cf10","accountName":"lwUhAZEar","action":"accelerate"}
8cce533d-b14a-48c7-ae55-b4ab2b07cf10	{"eventId":"cbbe7492-621b-4e7c-b2d8-8f6792cd5cc4","eventTime":1627381574567298,"accountId":"8cce533d-b14a-48c7-ae55-b4ab2b07cf10","accountName":"lwUhAZEar","action":"retard"}
8cce533d-b14a-48c7-ae55-b4ab2b07cf10	{"eventId":"2586fd35-9aca-4f65-b366-8b34c10c1c28","eventTime":1627381576573062,"accountId":"8cce533d-b14a-48c7-ae55-b4ab2b07cf10","accountName":"lwUhAZEar","action":"start"}
1cf03214-2825-4f4c-93ac-7d3e742921eb	{"eventId":"b8e2d873-1ea2-47ee-ae4d-05084f82a8bb","eventTime":1627381578579144,"accountId":"1cf03214-2825-4f4c-93ac-7d3e742921eb","accountName":"FSnoZ","action":"accelerate"}
1cf03214-2825-4f4c-93ac-7d3e742921eb	{"eventId":"43f4356e-6b59-4345-95cb-2255ee460e38","eventTime":1627381580585428,"accountId":"1cf03214-2825-4f4c-93ac-7d3e742921eb","accountName":"FSnoZ","action":"start"}
8cce533d-b14a-48c7-ae55-b4ab2b07cf10	{"eventId":"f08be46d-6534-4692-b023-aad0c263487c","eventTime":1627381582593780,"accountId":"8cce533d-b14a-48c7-ae55-b4ab2b07cf10","accountName":"lwUhAZEar","action":"forward"}
Listen to the changelog topic which contains the mapping
kafka-avro-console-consumer \
  --bootstrap-server localhost:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --property print.key=true \
  --key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
  --from-beginning \
  --topic pseudonymize-pseudonym-changelog
Example output
accountC	{"accountId":"5f3ce651-4f07-4670-9bf7-26c18a038a96","accountName":"dNqCiDRex"}
accountB	{"accountId":"1cf03214-2825-4f4c-93ac-7d3e742921eb","accountName":"FSnoZ"}
accountA	{"accountId":"8cce533d-b14a-48c7-ae55-b4ab2b07cf10","accountName":"lwUhAZEar"}

Open Topics

About

Examples for Kafka and Kafka Streams usage

Topics

Resources

License

Stars

Watchers

Forks

Languages