Skip to content

Debezium CDC example from a python django mysql source to .net core elasticsearch destination using kafka.

Notifications You must be signed in to change notification settings

hyagli/cdc-python-netcore

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Debezium is an open source project for change data capture (CDC). This example has two components to demonstrate the utilities.

  • A Django application with a MySQL database that saves data.
  • A .NET Core application with a PostgreSQL database that consumes this data.

Debezium

Requirements:

  • Just docker

To get the docker containers up and running:

docker-compose up -d

To create the django tables in MySQL:

docker-compose run --rm --no-deps python_app python manage.py migrate

To add some polls from admin page, create a superuser

docker-compose run --rm --no-deps python_app python manage.py createsuperuser

Using the username/password you just generated, you can later visit http://localhost:8000/admin/polls/question/ and create some rows after setting up CDC.

Grant the required MySQL rights to django so that Debezium can do it's job. To do this, go to Adminer UI at http://localhost:8080/. Login using:

Server: mysql
Username: root
Password: pass
Database: djangodb

After logging in, click "SQL command" and execute this:

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO django@'%';

The next thing to do is set up Debezium by sending a cURL command to kafka connect.
You can read about the Debezium MySQL connector configuration at https://debezium.io/documentation/reference/connectors/mysql.html#mysql-required-connector-configuration-properties

To send our binary Protobuffer data, we will use the same method as Avro configuration explained here: https://debezium.io/documentation/reference/transformations/outbox-event-router.html#avro-as-payload-format

Open a new terminal, and use the curl command to register the Debezium MySQL connector. (You may need to escape your double-quotes on windows if you get a parsing error). This will add a connector in our kafka-connect container to listen to database changes in our outbox table.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ --data-raw '{
    "name": "cdc-python-netcore-connector-outbox",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "django",
        "database.password": "django",
        "database.server.name": "cdc-mysql",
        "database.history.kafka.topic": "cdc-test",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "table.include.list": "djangodb.polls_outbox",
        "transforms": "outbox",
        "transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter",
        "value.converter": "io.debezium.converters.ByteBufferConverter",
        "value.converter.schemas.enable": "false",
        "value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter"
    }
}'

Create a new poll question using the admin page at http://localhost:8000/admin/polls/question/add/

To see if everything is running as expected go to our kafdrop container page at http://localhost:9000/ You should see the topics.

To prepare a protobuf file between python and .net core, I wrote a proto file: /proto/question.proto. To compile the proto file, you can install the protobuf compiler using:

brew install protobuf

And run the following command inside the proto folder (I've already included the compiled output of the proto file in the repo).

protoc question.proto --python_out=./output/python/ --csharp_out=./output/csharp/ --descriptor_set_out=question.desc

We now need an outbox table to implement the cdc outbox pattern using Debezium. I created the Outbox model for this:

class Outbox(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    aggregatetype = models.CharField(max_length=255)
    aggregateid = models.CharField(max_length=255)
    event_type = models.CharField(max_length=255, db_column='type')
    payload = models.BinaryField()

You can read the Debezium documentation for details. The payload column is special here since it will hold the serialized protobuf value and it will be passed transparently by Debezium to Kafka.

To populate the outbox table, I used the save_model method of admin view:

@transaction.atomic
def save_model(self, request, obj, form, change):
    super().save_model(request, obj, form, change)
    self.create_outbox_record(obj)

def create_outbox_record(self, obj):
    ts = Timestamp()
    ts.FromDatetime(obj.pub_date)
    proto = QuestionProto(
        id=obj.id,
        question_text=obj.question_text,
        pub_date=ts,
    )
    outbox = Outbox(
        aggregatetype='question',
        aggregateid=obj.id,
        event_type='question_created',
        payload=proto.SerializeToString(),
    )
    outbox.save()
    #outbox.delete()

About

Debezium CDC example from a python django mysql source to .net core elasticsearch destination using kafka.

Topics

Resources

Stars

Watchers

Forks

Contributors 2

  •  
  •