The goal of this project is to play with Kafka
, Debezium
and KSQL
. For this, we have: research-service
that inserts/updates/deletes records in MySQL
; Source Connectors
that monitor change of records in MySQL and push messages related to those changes to Kafka; Sink Connectors
and kafka-research-consumer
that listen messages from Kafka and insert/update documents in Elasticsearch
; finally, KSQL-Server
that listens some topics in Kafka, does some joins and pushes new messages to new topics in Kafka.
-
research-service
Monolithic
Spring Boot
application that exposes a REST API to manageInstitutes
,Articles
,Researchers
andReviews
. The data is saved inMySQL
. -
kafka-research-consumer
Spring Boot
application that listens messages from the topicREVIEWS_RESEARCHERS_INSTITUTES_ARTICLES
(that is one ofKSQL
outputs) and save the payload of those messages (i.e, reviews with detailed information) inElasticsearch
.
Java 11+
Docker
Docker-Compose
-
Open a terminal and inside
springboot-kafka-debezium-ksql
root folder run the following commanddocker-compose up -d
Note: During the first run, an image for
mysql
andkafka-connect
will be built, whose names arespringboot-kafka-debezium-ksql_mysql
andspringboot-kafka-debezium-ksql_kafka-connect
, respectively. To rebuild those images rundocker-compose build
-
Wait a little bit until all containers are
Up (healthy)
. To check the status of the containers rundocker-compose ps
In order to have topics in Kafka
with more than 1
partition, we must create them manually and not wait for the connectors to create for us. So, for it:
-
Open a new terminal and make sure you are in
springboot-kafka-debezium-ksql
root folder -
Run the script below
./create-kafka-topics.sh
Note: you can ignore the warnings
It will create the topics
mysql.researchdb.institutes
,mysql.researchdb.researchers
,mysql.researchdb.articles
andmysql.researchdb.reviews
with5
partitions.
-
In a terminal, make sure you are in
springboot-kafka-debezium-ksql
root folder -
Run the following
curl
commands to create oneDebezium
and twoElasticsearch-Sink
connectors inkafka-connect
curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/debezium-mysql-source-researchdb.json curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-institutes.json curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-articles.json
-
You can check the state of the connectors and their tasks on
Kafka Connect UI
(http://localhost:8086) or callingkafka-connect
endpointcurl localhost:8083/connectors/debezium-mysql-source-researchdb/status curl localhost:8083/connectors/elasticsearch-sink-institutes/status curl localhost:8083/connectors/elasticsearch-sink-articles/status
-
The state of the connectors and their tasks must be
RUNNING
. If there is any problem, you can checkkafka-connect
container logs.docker logs kafka-connect
-
In a new terminal, make sure you are inside
springboot-kafka-debezium-ksql
root folder -
Run the command below to start the application
./mvnw clean spring-boot:run --projects research-service -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
Note: It will create some articles, institutes and researchers. If you don't want it, just set to
false
the propertiesload-samples.articles.enabled
,load-samples.institutes.enabled
andload-samples.researchers.enabled
inapplication.yml
. -
The Swagger link is http://localhost:9080/swagger-ui.html
-
Important: create at least one
review
so thatmysql.researchdb.reviews-key
andmysql.researchdb.reviews-value
are created inSchema Registry
. Below there is a request sample to create a review.curl -i -X POST localhost:9080/api/reviews \ -H "Content-Type: application/json" \ -d "{ \"researcherId\": 1, \"articleId\": 1, \"comment\": \"Ln 56: replace the 'a' by 'an'\"}"
-
In a new terminal, inside
springboot-kafka-debezium-ksql
root folder, run thedocker
command below to startksql-cli
docker run -it --rm --name ksql-cli \ --network springboot-kafka-debezium-ksql_default \ -v $PWD/docker/ksql/researchers-institutes.ksql:/tmp/researchers-institutes.ksql \ -v $PWD/docker/ksql/reviews-researchers-institutes-articles.ksql:/tmp/reviews-researchers-institutes-articles.ksql \ confluentinc/cp-ksql-cli:5.4.1 http://ksql-server:8088
This log should show, and the terminal will be waiting for user input
=========================================== = _ __ _____ ____ _ = = | |/ // ____|/ __ \| | = = | ' /| (___ | | | | | = = | < \___ \| | | | | = = | . \ ____) | |__| | |____ = = |_|\_\_____/ \___\_\______| = = = = Streaming SQL Engine for Apache Kafka® = =========================================== Copyright 2017-2019 Confluent Inc. CLI v5.4.1, Server v5.4.1 located at http://ksql-server:8088 Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
-
On
ksql-cli
command line, run the following commands-
Set
auto.offset.reset
valueSET 'auto.offset.reset' = 'earliest';
-
Run the following script. It will create
RESEARCHERS_INSTITUTES
topicRUN SCRIPT '/tmp/researchers-institutes.ksql';
-
check whether the topic was created
DESCRIBE RESEARCHERS_INSTITUTES; SELECT * FROM RESEARCHERS_INSTITUTES EMIT CHANGES LIMIT 5;
-
Run the script below. It will create
REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES
topicRUN SCRIPT '/tmp/reviews-researchers-institutes-articles.ksql';
-
Check whether the topic was created
DESCRIBE REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES; SELECT * FROM REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES EMIT CHANGES LIMIT 1;
-
-
In a terminal, make sure you are in
springboot-kafka-debezium-ksql
root folder -
Run the
curl
command below to createelasticsearch-sink-researchers
connector inkafka-connect
curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-researchers.json
-
You can check the state of the connector and its task on
Kafka Connect UI
(http://localhost:8086) or callingkafka-connect
endpointcurl localhost:8083/connectors/elasticsearch-sink-researchers/status
-
Open a new terminal and navigate to
springboot-kafka-debezium-ksql
root folder -
Run the command below to start the application
./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
-
This service runs on port
9081
. Thehealth
endpoint is: http://localhost:9081/actuator/health -
[Optional] We can start another
kafka-research-consumer
instance by opening another terminal and running./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9082"
-
Go to the terminal where
ksql-cli
is running. Onksql-cli
command line, run the following querySELECT * FROM REVIEWS_RESEARCHERS_INSTITUTES_ARTICLES EMIT CHANGES;
-
In another terminal, call the
research-service
simulation endpointcurl -X POST localhost:9080/api/simulation/reviews \ -H "Content-Type: application/json" \ -d "{\"total\": 100, \"sleep\": 100}"
-
The GIF below shows it
-
You can also query
Elasticsearch
curl "localhost:9200/reviews/_search?pretty"
-
Kafka Topics UI
Kafka Topics UI
can be accessed at http://localhost:8085 -
Kafka Connect UI
Kafka Connect UI
can be accessed at http://localhost:8086 -
Schema Registry UI
Schema Registry UI
can be accessed at http://localhost:8001 -
Schema Registry
You can use
curl
to check the subjects inSchema Registry
- Get the list of subjects
curl localhost:8081/subjects
- Get the latest version of the subject
mysql.researchdb.researchers-value
curl localhost:8081/subjects/mysql.researchdb.researchers-value/versions/latest
- Get the list of subjects
-
Kafka Manager
Kafka Manager
can be accessed at http://localhost:9000Configuration
- First, you must create a new cluster. Click on
Cluster
(dropdown on the header) and then onAdd Cluster
- Type the name of your cluster in
Cluster Name
field, for example:MyZooCluster
- Type
zookeeper:2181
inCluster Zookeeper Hosts
field - Enable checkbox
Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)
- Click on
Save
button at the bottom of the page.
- First, you must create a new cluster. Click on
-
Elasticsearch
Elasticsearch
can be accessed at http://localhost:9200- Get all indices
curl "localhost:9200/_cat/indices?v"
- Search for documents
curl "localhost:9200/articles/_search?pretty" curl "localhost:9200/institutes/_search?pretty" curl "localhost:9200/researchers/_search?pretty" curl "localhost:9200/reviews/_search?pretty"
- Get all indices
-
MySQL
docker exec -it mysql mysql -uroot -psecret --database researchdb SELECT a.id AS review_id, c.id AS article_id, c.title AS article_title, b.id AS reviewer_id, b.first_name, b.last_name, b.institute_id, a.comment \ FROM reviews a, researchers b, articles c \ WHERE a.researcher_id = b.id and a.article_id = c.id;
Type
exit
to leaveMySQL
terminal
- Go to the terminals where
research-service
andkafka-research-consumer
are running and pressCtrl+C
to stop them - Go to the terminal where
ksql-cli
is running and pressCtrl+C
to stop theSELECT
and typeexit
- In a terminal and inside
springboot-kafka-debezium-ksql
, run the command below to stop and remove Docker containers, networks and volumesdocker-compose down -v
-
Create ES indices dynamically and add an
alias
for them. -
Replace the deprecated
topic.index.map
configured inelasticsearch-sink-*
connectors. Waiting for thosekafka-connect-elasticsearch
issues to be fixed:Create indices before writing records #261
confluentinc/kafka-connect-elasticsearch#261ConnectException: Cannot create mapping when using RegexRouter/TimestampRouter SMT #99
confluentinc/kafka-connect-elasticsearch#99