Debezium is an open source project for change data capture (CDC). This example has two components to demonstrate the utilities.
- A Django application with a MySQL database that saves data.
- A .NET Core application with a PostgreSQL database that consumes this data.
Debezium
Requirements:
- Just docker
To get the docker containers up and running:
docker-compose up -d
To create the django tables in MySQL:
docker-compose run --rm --no-deps python_app python manage.py migrate
To add some polls from admin page, create a superuser
docker-compose run --rm --no-deps python_app python manage.py createsuperuser
Using the username/password you just generated, you can later visit http://localhost:8000/admin/polls/question/ and create some rows after setting up CDC.
Grant the required MySQL rights to django so that Debezium can do it's job. To do this, go to Adminer UI at http://localhost:8080/. Login using:
Server: mysql
Username: root
Password: pass
Database: djangodb
After logging in, click "SQL command" and execute this:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO django@'%';
The next thing to do is set up Debezium by sending a cURL command to kafka connect.
You can read about the Debezium MySQL connector configuration at https://debezium.io/documentation/reference/connectors/mysql.html#mysql-required-connector-configuration-properties
To send our binary Protobuffer data, we will use the same method as Avro configuration explained here: https://debezium.io/documentation/reference/transformations/outbox-event-router.html#avro-as-payload-format
Open a new terminal, and use the curl command to register the Debezium MySQL connector. (You may need to escape your double-quotes on windows if you get a parsing error). This will add a connector in our kafka-connect container to listen to database changes in our outbox table.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ --data-raw '{
"name": "cdc-python-netcore-connector-outbox",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "django",
"database.password": "django",
"database.server.name": "cdc-mysql",
"database.history.kafka.topic": "cdc-test",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"table.include.list": "djangodb.polls_outbox",
"transforms": "outbox",
"transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter",
"value.converter": "io.debezium.converters.ByteBufferConverter",
"value.converter.schemas.enable": "false",
"value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter"
}
}'
Create a new poll question using the admin page at http://localhost:8000/admin/polls/question/add/
To see if everything is running as expected go to our kafdrop container page at http://localhost:9000/ You should see the topics.
To prepare a protobuf file between python and .net core, I wrote a proto file: /proto/question.proto
. To compile the proto file, you can install the protobuf compiler using:
brew install protobuf
And run the following command inside the proto folder (I've already included the compiled output of the proto file in the repo).
protoc question.proto --python_out=./output/python/ --csharp_out=./output/csharp/ --descriptor_set_out=question.desc
We now need an outbox table to implement the cdc outbox pattern using Debezium. I created the Outbox model for this:
class Outbox(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
aggregatetype = models.CharField(max_length=255)
aggregateid = models.CharField(max_length=255)
event_type = models.CharField(max_length=255, db_column='type')
payload = models.BinaryField()
You can read the Debezium documentation for details. The payload
column is special here since it will hold the serialized protobuf value and it will be passed transparently by Debezium to Kafka.
To populate the outbox table, I used the save_model
method of admin view:
@transaction.atomic
def save_model(self, request, obj, form, change):
super().save_model(request, obj, form, change)
self.create_outbox_record(obj)
def create_outbox_record(self, obj):
ts = Timestamp()
ts.FromDatetime(obj.pub_date)
proto = QuestionProto(
id=obj.id,
question_text=obj.question_text,
pub_date=ts,
)
outbox = Outbox(
aggregatetype='question',
aggregateid=obj.id,
event_type='question_created',
payload=proto.SerializeToString(),
)
outbox.save()
#outbox.delete()