Apache Kafka often plays a key role in the overall data architecture with other systems producing/consuming data to/from it. These could be click stream events, logs, sensor data, database change-events etc. So, as you might imagine, there is a lot of data in Kafka (topics) but it’s useful only when consumed or ingested into other systems.
Kafka Connect is a platform to stream data between Apache Kafka and external systems in a scalable and reliable manner. Thanks to connectors (sink/source), you do not need to write custom integration code to glue systems together.
In case an existing connector is not available, you can leverage the powerful Kafka Connect framework to build your own connectors.
This example demonstrates a data pipeline, to synchronize data between MySQL and Redis. Although this is applicable to many use cases, the one being covered here is that of product/inventory information being available to downstream systems in near real-time to fulfill search requirements.
What's covered?
- The high level solution architecture
- The end to end guide/how-to on how to deploy and test the entire solution on Azure
Imagine data being uploaded (using a batched mode for example) to a relational database (MySQL on Azure in this case) - this can be continuously synchronized to a search engine with low latency - In this solution, Redis on Azure is used as the search engine (more on this soon). This provides a foundation for other services (such as APIs) to drive important parts of the business, such as a customer facing website that can provide fresh, up-to date information on products, availability etc.
This section provides a high level overview of the solution and the individual components.
As you can see, it has several moving parts:
- The infrastructure components that form the bedrock of the architecture
- Apache Kafka: Confluent Cloud Kafka cluster on Azure infrastructure (available via Azure Marketplace)
- Redis: To be specific, we will be using the RediSearch module which is supported in the Enterprise Tier of Azure Cache for Redis
- MySQL database: Fully managed database as a service
- Kafka Connect cluster with MySQL source connector (fully managed!)
- Application components: These are services running on Azure Spring Cloud
- Change Event Processor: A Java Spring Boot application that uses Spring Kafka integration.
- Search application: Another Java Spring Boot app that exposes a REST API.
Thanks to the JDBC source connector, data in MySQL (products
table) is sent to a Kafka topic - here is how the JSON payload looks like TODO:
A consumer application processes these events and makes sure that they are available for search. It does so by creating the required index definition and adding new product info as RediSearch documents (currently represented as a Redis HASH
). The search services then makes makes the RediSearch data available as a REST API. To keep things simple, the REST API allows you to enter queries as per the RediSearch query syntax
The above mentioned services use the JRediSearch library to interface with RediSearch to create index, add documents and querying.
Before we dive into the nitty gritty, please ensure that you have:
- An Azure account. You can get it for free if you don't already have one.
- Install Azure CLI to deploy and manage the infrastructure components.
- JDK 11 or above
- Recent Maven release
- Git
- MySQL instance: Create an Azure Database for MySQL server using the Azure CLI (or the Azure portal if that's what you prefer)
- Apache Kafka cluster: Use the Azure portal to create an instance of Apache Kafka for Confluent Cloud.
- Azure Cache for Redis: Use the Azure portal to create a Redis Enterprise cache instance.
- Azure Spring Cloud: Provision an instance of Azure Spring Cloud using the Azure portal.
MySQL instance on Azure
- Configure firewall rules to allow public inbound traffic access (
0.0.0.0/0
). - Ensure that a specific database timezone is set.
- Create a table (
products
) in MySQL - you can use a client like MySQL workbench
CREATE TABLE `products` (
`product_id` int(11) NOT NULL,
`product_name` varchar(255) NOT NULL,
`created_at`timestamp NOT NULL,
`product_details` JSON DEFAULT NULL,
PRIMARY KEY (`product_id`)
);
For details, please refer to the Prerequisites section in the connector documentation
Kafka cluster in Confluent Cloud
- (optional) Create topic: The connector automatically creates a topic (based on default convention). Create the topic manually if you want to override it's default settings (please make sure to use the same topic name while configuring the connector)
- Configure and launch the MySQL source connector using the portal.
Before you move ahead, make sure that the basic stuff is working - insert record in MySQL and ensure that the Kafka topic is getting the messages.
For Redis instance on Azure, please ensure you have the host name and the access keys information handy.
Start by cloning the GitHub repository and change into right directory:
git clone https://github.com/Azure-Samples/mysql-kafka-redis-integration
cd mysql-kafka-redis-integration
For both the services, update the application.yaml
file in src/main/resources
folder with connection details for Azure Cache for Redis and the Confluent Cloud Kafka cluster.
Here is a trimmed down version for the Change Events Processor service:
redis:
host: <enter redis host>
port: <enter redis port>
password: <enter redis access key>
topic:
name: <topic name e.g. myserver.products>
partitions-num: 6
replication-factor: 3
spring:
kafka:
bootstrap-servers:
- <kafka bootstrap server>
properties:
ssl.endpoint.identification.algorithm: https
sasl.mechanism: PLAIN
request.timeout.ms: 20000
retry.backoff.ms: 500
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="<enter kafka API key>" password="<enter kafka API secret>";
security.protocol: SASL_SSL
...
The config for the Search API service is quite compact:
redis:
host: <enter redis host>
port: <enter redis port>
password: <enter redis access key>
Build JAR files for the Spring applications:
export JAVA_HOME=<enter path to JDK e.g. /Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home>
# Change Events Processor service
mvn clean package -f change-events-processor/pom.xml
# Search API service
mvn clean package -f search-api/pom.xml
Install the Azure Spring Cloud extension for Azure CLI:
az extension add --name spring-cloud
Create the Azure Spring Cloud applications corresponding to both the services:
# Change Events Processor service
az spring-cloud app create -n change-events-processor -s <enter the name of Azure Spring Cloud service instance> -g <enter azure resource group name> --runtime-version Java_11
# Search API service
az spring-cloud app create -n search-api -s <enter the name of Azure Spring Cloud service instance> -g <enter azure resource group name> --runtime-version Java_11 --is-public true
Deploy the JAR files for the respective applications you just created:
# for the Change Events Processor service
az spring-cloud app deploy -n change-events-processor -s <enter the name of Azure Spring Cloud service instance> -g <enter azure resource group name> --jar-path change-events-processor/target/change-events-processor-0.0.1-SNAPSHOT.jar
# for the Search API service
az spring-cloud app deploy -n search-api -s <enter the name of Azure Spring Cloud service instance> -g <enter azure resource group name> --jar-path search-api/target/search-api-0.0.1-SNAPSHOT.jar
Now that we have all the components in place, we can test the end to end functionality. It's quite simple:
- Add new product data to the MySQL database
- Use the Search app to make sure it has propagated all the way to Redis
Insert random product data:
INSERT INTO `products` VALUES (42, 'Outdoor chairs', NOW(), '{"brand": "Mainstays", "description": "Mainstays Solid Turquoise 72 x 21 in. Outdoor Chaise Lounge Cushion", "tags": ["Self ties cushion", "outdoor chairs"], "categories": ["Garden"]}');
INSERT INTO `products` VALUES (43, 'aPhone', NOW(), '{"brand": "Orange", "description": "An inexpensive phone", "tags": ["electronics", "mobile phone"], "categories": ["Electronics"]}');
Get the URL for Search API service - use the portal or the CLI as such:
az spring-cloud app show -n search-api -s <enter the name of Azure Spring Cloud service instance> -g <enter azure resource group name>
Use curl
or another HTTP client to invoke the Search API. Each of these queries will return results in form of a JSON payload as such:
[
{
"created": "1614235666000",
"name": "Outdoor chairs",
"description": "Mainstays Solid Turquoise 72 x 21 in. Outdoor Chaise Lounge Cushion",
"id": "42",
"categories": "Garden",
"brand": "Mainstays",
"tags": "Self ties cushion, outdoor chairs"
},
{
"created": "1614234718000",
"name": "aPhone",
"description": "An inexpensive phone",
"id": "43",
"categories": "Electronics",
"brand": "Orange",
"tags": "electronics, mobile phone"
}
]
Here are a few examples to get you started. Note that the query parameter q
is used to specify the RediSearch query.
# search for all records
curl <search api URL>/search?q=*
# search for products by name
curl <search api URL>/search?q=@name:Outdoor chairs
# search for products by category
curl <search api URL>/search?q=@categories:{Garden | Electronics}
# search for products by brand
curl <search api URL>/search?q=@brand:Mainstays
# apply multiple search criteria
curl <search api URL>/search?q=@categories:{Electronics} @brand:Orange
You can continue to add more product information and check the pipeline.
Other things you might want to try:
- Confirm that information is flowing to the Kafka topic in Confluent Cloud
- Check the logs for the consumer application deployed to Azure Spring Cloud - this will give a sneak peek into the records that are getting processed (use
az spring-cloud app logs -n <app name> -s <service name> -g <resource group>
) - Take a look at the RediSearch Query Syntax and try other queries as well.
- Connect to the Azure Cache for Redis instance and run the RediSearch queries directly - just to double check.
Connect using redis-cli
as such:
redis-cli -h <enter host name> -p <enter port i.e. 10000> -a <enter redis password/access key> --tls
If you want to store this data to Azure Data Lake Storage for longer term (cold), the ADLS Gen2 connector for Kafka has you covered. For our scenario, we already have product data flowing into the Kafka topic in Confluent Cloud on Azure - all we need to do configure the connector to get the job done.
And guess what, that's available as a fully-managed offering as well!
Here is what you need to do:
- Create a storage account
- Configure the connector and start it. Please make sure to use the same topic name as you did before (e.g.
myserver.products
) - Confirm that the data was exported to the Azure storage container in the ADLS account.
For a step by step guide, please follow the documentation
Once you're done, make sure to delete the services to so that you do not incur unwanted costs. If they in the same resource group, simply deleting the resource group will suffice. You can also delete the resources (MySQL, Confluent Cloud organization, Redis and Azure Spring Cloud instance) individually.
You learnt about the high level architecture and the complete guide on how to run the solution on Azure. Also, the entire solution was based on managed PaaS services. This offers a significant benefit since you don't have to setup and maintain complex distributed systems such as a database, streaming platform and runtime infrastructure for your Spring Boot Java apps.
Please note that this is just a part of a potentially larger use case. Thanks to Kafka, you can extend this to integrate with other systems as well, for example, Azure Data Lake using (yet another fully managed!) the ADLS Gen2 connector.
The following resources might be helpful:
- [Get started with Apache Kafka for Confluent Cloud] (https://docs.microsoft.com/en-us/azure/partner-solutions/apache-kafka-confluent-cloud/create)
- How to configure clustering in Azure Cache for Redis
- How to configure active geo-replication in Enterprise Azure Cache for Redis instances
- Azure Spring Cloud reference architecture
- Setup Service Discovery for your Spring Cloud microservices
- Choose the right MySQL Server option in Azure
- Migrate your MySQL database by using import and export