All services for this demo scenario are pre-configured and supposed to be run using the provided docker compose environment. The docker-compose.yaml
file references container images for arm64 CPUs. Make sure to change them accordingly if you plan to run this on a different CPU architecture.
NOTE: The following commands are supposed to be run from within the root directory of this repository.
Running docker compose up
starts the following services:
- connect
- kafka
- ksqldb-cli
- ksqldb-server
- minio
- mongodb
- schema-registry
- zookeeper
In a separate terminal window run
docker run -it --rm \
--network sample-scenario \
mongo:6.0.1 \
mongosh mongodb:27017/demodb --eval "db.getCollection('fake_citizens').find().limit(10)"
This should result in displaying the documents contained in the corresponding MongoDB collection demodb.fake_citizens
. One such document looks as follows:
{
"_id": "6326f8ae1906fc47bc20248f",
"personal": {
"firstname": "Darla",
"lastname": "Moore",
"age": 32,
"eyecolor": "hazel",
"gender": "female",
"height": 155,
"weight": 126
},
"isactive": false,
"registered": "2021-06-10T07:39:17 -02:00",
"contact": { "email": "darlamoore@genmom.com", "phone": "(960) 567-2864" },
"knownresidences": [
"888 Celeste Court, Colton, Tennessee, 80627",
"383 Hicks Street, Barronett, Virgin Islands, 50963",
"876 Erasmus Street, Vicksburg, Alaska, 60810"
]
}
The following commands creates a Debezium MongoDB source connector instance. Debezium's MongoDB source connector is configured together with the CipherField
SMT to perform log-based change data capture against the MongoDB collection demodb.fake_citizens
. Several fields in the CDC payloads get encrypted based on to the SMT configuration settings.
🧸 kcctl - a CLI for Apache Kafka Connect - is used to perform any Kafka Connect related operations. First the connect cluster address is set and used as the CLI tool's context. Then the MongoDB source connector is created.
docker run -it --rm \
--network sample-scenario \
-v ${PWD}/data/connect/:/home \
debezium/tooling:1.2 \
bash -c "kcctl config set-context default --cluster=http://connect:8083 && kcctl apply -f /home/register_mongodb_source_k4k_enc.json"
Running the following command shows the resulting kafka records in the corresponding topic:
docker run --tty --rm \
--network sample-scenario \
debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -t mongodb.demodb.fake_citizens -o beginning -q | jq .
One such sample Kafka record looks as follows:
{
"_id": "6326f8ae2abf405005800a64",
"personal": {
"firstname": "Cohen",
"lastname": "KQE7msoB4V1YUqNJ9RIcgaghzdoF5v0x4srV8JVD3uUB5ZN9laJf58kMMLJrZXnBa7E=",
"age": "JAE7msoBduIDjgs77ngR8MHMEaiUj2uN+xKiIQ+E3z6Xg3MWDDCya2V5wWux",
"eyecolor": "KAE7msoBihhRsFt/cJ43D/6gg5emkVIxU25Oh3dHtbHNGYCP7oDLNQwwsmtlecFrsQ==",
"gender": "male",
"height": "JQE7msoBmRoWNjVqbLZKR1nXv8WplADRWrhbQiHOZ42fhlbLxAwwsmtlecFrsQ==",
"weight": "JQE7msoBToMfgPFToD45bFqNQZKJaEm1HIIIq1/k2/nS4ODnrQwwsmtlecFrsQ=="
},
"isactive": true,
"registered": "2021-04-03T08:53:12 -02:00",
"contact": "mwEBO5rKATXVG0dEE8lZZTUVWQLwtaNgk1Dd1icGiB41jilIE0hXBJO/3moF8NoPCq2Ip+z24qbFKr+V3TABPS/AfRQSEm1RZlaM89O+cS6wcMpCGKkdhch6V3asX3vw6NGqFbIyR1mvIFEpG9lTbrkKAqU7PvEFEP5aCIUzaUO5CKGaMPHEV7E4qCDszqIP84GPwI5prD0sJX40DDCya2V5wWux",
"knownresidences": [
"TQE7msoBz9WjqzqCFPHujJrnDiArQRw/6NWyCWf8Iq0Mgu0Ltgs9UWmsJOI3xFeTjxAitSgT3iIZuupR9kHxazXUDsTsUWGEGQUWPZQMMLJrZXnBa7E="
]
}
Hit CRTL+C
to stop the consumer process and exit the container.
To enter an interactive ksqldb CLI session run the following command
docker compose exec ksqldb-cli ksql http://ksqldb-server:8088
Within this CLI session run the ksql statements below:
SET 'auto.offset.reset'='earliest';
-- create base stream from raw data i.e. partially encrypted
-- Debezium CDC records originating from a MongoDB collection
CREATE STREAM s_fake_citizens(
_ID VARCHAR,
PERSONAL STRUCT<FIRSTNAME VARCHAR, LASTNAME VARCHAR, AGE VARCHAR, EYECOLOR VARCHAR, GENDER VARCHAR, HEIGHT VARCHAR, WEIGHT VARCHAR>,
ISACTIVE BOOLEAN,
REGISTERED VARCHAR,
CONTACT VARCHAR,
KNOWNRESIDENCES ARRAY<VARCHAR>
) WITH (kafka_topic='mongodb.demodb.fake_citizens',value_format='JSON',partitions=1,replicas=1);
-- Create a derived stream which itself contains
-- sensitive data that must be encrypted before
-- having it written into backing kafka topic.
CREATE STREAM s_fake_citizens_modified_enc AS
SELECT
K4KENCRYPT(
CONCAT(
PERSONAL->FIRSTNAME,' ',K4KDECRYPT(PERSONAL->LASTNAME,'')
)
) AS FULLNAME,
CASE
WHEN CAST(K4KDECRYPT(PERSONAL->WEIGHT,0) AS double) /
(K4KDECRYPT(PERSONAL->HEIGHT,0) * K4KDECRYPT(PERSONAL->HEIGHT,0) / 10000.0)
< 18.5 THEN 'underweight'
WHEN CAST(K4KDECRYPT(PERSONAL->WEIGHT,0) AS double) /
(K4KDECRYPT(PERSONAL->HEIGHT,0) * K4KDECRYPT(PERSONAL->HEIGHT,0) / 10000.0)
>= 25.0 THEN 'overweight'
ELSE 'normal weight'
END AS BMI_CATEGORY,
K4KDECRYPT(PERSONAL->AGE,0) AS AGE,
K4KENCRYPT(
K4KDECRYPT(CONTACT,STRUCT(`email`:='',`phone`:=''))->`phone`
) AS PHONE,
K4KENCRYPT(
TRANSFORM(KNOWNRESIDENCES,e => REPLACE(K4KDECRYPT(e,''),',',' |'))
,array['']
) AS ADDRESSES
FROM s_fake_citizens
EMIT CHANGES;
The main outcome of this is a new stream called s_fake_citizens_modified_enc
which is backed by a corresponding Kafka topic. This topic contains derived data which is also partially encrypted and results from the streaming query in the ksql snippet above.
Type exit
to quit the ksqlDB CLI and exit the container.
Run the following command to show the partially encrypted records in the Kafka topic:
docker run --tty --rm \
--network sample-scenario \
debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -t S_FAKE_CITIZENS_MODIFIED_ENC -o beginning -q | jq .
One such sample records looks like the following:
{
"FULLNAME": "LwE7msoBP5mb3oYl1CSwwEgELLbeFHzcH+185BrYr5LlM1QZPD6Lnw5cclJf8CAMMLJrZXnBa7E=",
"BMI_CATEGORY": "underweight",
"AGE": 62,
"PHONE": "MQE7msoBIo/DCMiE43MU8JLuUOQsAi4GvvLkdc543aJgwzMQccfWeO1PD+KXYs1zAQwwsmtlecFrsQ==",
"ADDRESSES": [
"UAE7msoBbKsA18BTvHlgPBGsOTkswNQEJApj8FufKJXw133EIQXYVKaeqrlyMzPUtqkFUWqFH/uPas0YtnTflrHzTxOtUP5IJiHdcG7OvboMMLJrZXnBa7E="
]
}
Hit CRTL+C
to stop the consumer process and exit the container.
The following creates a Camel MinIO sink connector instance. The Camel MinIO sink connector is configured together with the CipherField
SMT to decrypt the configured fields from the Kafka records that have been produced by the stream processing job based on the ksqlDB query (see step 4 above)
🧸 kcctl - a CLI for Apache Kafka Connect - is used to perform any Kafka Connect related operations. First the connect cluster address is set and used as the CLI tool's context. Then the MinIO sink connector is created.
docker run -it --rm \
--network sample-scenario \
-v ${PWD}/data/connect/:/home \
debezium/tooling:1.2 \
bash -c "kcctl config set-context default --cluster=http://connect:8083 && kcctl apply -f /home/register_minio_sink_k4k_dec.json"
Either run open http://localhost:9001
or directly open this URL in your browser of choice to access MinIO's webUI. Use admin
(user) and minio12345
(password) for accessing the page. Browse into the k4k-decrypt
bucket to inspect the JSON files. One such file which contains the successfully decrypted JSON object looks as follows:
{
"PHONE": "(897) 455-2185",
"BMI_CATEGORY": "normal weight",
"ADDRESSES": [
"422 Channel Avenue | Biddle | Alabama | 41587",
"456 Centre Street | Alleghenyville | Georgia | 49431"
],
"FULLNAME": "Vickie Garrett",
"AGE": 26
}
Running docker compose down
stops all services.