Skip to content

Commit 56a2c4b

Browse files
committed
Done publishing stripped event
1 parent f749fb5 commit 56a2c4b

File tree

4 files changed

+98
-54
lines changed

4 files changed

+98
-54
lines changed

README.md

+59-54
Original file line numberDiff line numberDiff line change
@@ -5,93 +5,74 @@ This example has two components to demonstrate the utilities.
55

66
Debezium
77

8-
Requirements
9-
Just docker
8+
Requirements:<br>
9+
- Just docker
1010

11-
12-
Create your Django app:
13-
14-
docker-compose run --rm --no-deps web django-admin startproject django_cdc .
15-
16-
Modify src/django_cdc/settings.py
17-
18-
DATABASES = {
19-
'default': {
20-
'ENGINE': 'django.db.backends.mysql',
21-
'NAME': 'djangodb',
22-
'USER': 'django',
23-
'PASSWORD': 'django',
24-
'HOST': 'mysql',
25-
'PORT': 3306,
26-
}
27-
}
28-
29-
Run:
11+
To get the docker containers up and running:
3012

3113
docker-compose up -d
3214

33-
Run for default django tables:
15+
To create the django tables in MySQL:
3416

3517
docker-compose run --rm --no-deps python_app python manage.py migrate
36-
docker-compose run --rm --no-deps python_app python manage.py makemigrations polls
37-
docker-compose run --rm --no-deps python_app python manage.py migrate polls
3818

39-
Add some polls from admin page
19+
To add some polls from admin page, create a superuser
4020

4121
docker-compose run --rm --no-deps python_app python manage.py createsuperuser
4222

43-
The default login and password for the admin site is admin:admin.
23+
Using the username/password you just generated, you can later visit http://localhost:8000/admin/polls/question/ and create some rows after setting up CDC.
24+
25+
Grant the required MySQL rights to django so that Debezium can do it's job.
26+
To do this, go to Adminer UI at http://localhost:8080/. Login using:
4427

45-
Grant the required MySQL rights to django so that CDC can do it's job:
28+
Server: mysql
29+
Username: root
30+
Password: pass
31+
Database: djangodb
32+
33+
After logging in, click "SQL command" and execute this:
4634

4735
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO django@'%';
4836

49-
Verify Debezium MySQL connector configuration. Read more about it at https://debezium.io/documentation/reference/connectors/mysql.html#mysql-required-connector-configuration-properties
37+
The next thing to do is set up Debezium by sending a cURL command to kafka connect.<br>
38+
You can read about the Debezium MySQL connector configuration at https://debezium.io/documentation/reference/connectors/mysql.html#mysql-required-connector-configuration-properties
39+
40+
To send our binary Protobuffer data, we will use the same method as Avro configuration explained here: https://debezium.io/documentation/reference/transformations/outbox-event-router.html#avro-as-payload-format
5041

51-
{
52-
"name": "cdc-python-netcore-connector",
42+
Open a new terminal, and use the curl command to register the Debezium MySQL connector. (You may need to escape your double-quotes on windows if you get a parsing error). This will add a connector in our kafka-connect container to listen to database changes in our outbox table.
43+
44+
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ --data-raw '{
45+
"name": "cdc-python-netcore-connector-outbox-2",
5346
"config": {
5447
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
55-
"tasks.max": "1",
5648
"database.hostname": "mysql",
5749
"database.port": "3306",
5850
"database.user": "django",
5951
"database.password": "django",
60-
"database.server.id": "123321",
6152
"database.server.name": "cdc-mysql",
62-
"database.include.list": "djangodb",
53+
"database.history.kafka.topic": "cdc-test",
6354
"database.history.kafka.bootstrap.servers": "kafka:9092",
64-
"database.history.kafka.topic": "schema-changes.djangodb",
65-
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
66-
"value.converter": "io.debezium.converters.ByteBufferConverter"
55+
"table.include.list": "djangodb.polls_outbox",
56+
"transforms": "outbox",
57+
"transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter",
58+
"value.converter": "io.debezium.converters.ByteBufferConverter",
59+
"value.converter.schemas.enable": "false",
60+
"value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter"
6761
}
68-
}
69-
70-
Open a new terminal, and use the curl command to register the Debezium MySQL connector. (You may need to escape your double-quotes on windows if you get a parsing error)
71-
72-
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"cdc-python-netcore-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"django","database.password":"django","database.server.id":"123321","database.server.name":"cdc-mysql","database.include.list":"djangodb","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"schema-changes.djangodb"}}'
62+
}'
7363

7464
Create a new poll question using the admin page at http://localhost:8000/admin/polls/question/add/
7565

76-
Start a shell prompt in the kafka container:
77-
78-
docker exec -it <kafka-container-name> bash
79-
80-
List all topics:
66+
To see if everything is running as expected go to our kafdrop container page at http://localhost:9000/
67+
You should see the topics.
8168

82-
bin/kafka-topics.sh --list --zookeeper zookeeper:2181
83-
84-
List messages in polls_question topic:
85-
86-
bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic cdc-mysql.djangodb.polls_question --from-beginning
87-
88-
I created a protobuf file to have a well-defined type between python and .net core: `/proto/question.proto`. To compile the proto file, you can install the protobuf compiler using:
69+
To prepare a protobuf file between python and .net core, I wrote a proto file: `/proto/question.proto`. To compile the proto file, you can install the protobuf compiler using:
8970

9071
brew install protobuf
9172

9273
And run the following command inside the proto folder (I've already included the compiled output of the proto file in the repo).
9374

94-
protoc question.proto --python_out=./output/python/ --csharp_out=./output/cs/
75+
protoc question.proto --python_out=./output/python/ --csharp_out=./output/csharp/ --descriptor_set_out=question.desc
9576

9677
We now need an outbox table to implement the cdc outbox pattern using Debezium. I created the Outbox model for this:
9778

@@ -103,3 +84,27 @@ We now need an outbox table to implement the cdc outbox pattern using Debezium.
10384
payload = models.BinaryField()
10485

10586
You can read the [Debezium documentation](https://debezium.io/documentation/reference/configuration/outbox-event-router.html) for details. The `payload` column is special here since it will hold the serialized protobuf value and it will be passed transparently by Debezium to Kafka.
87+
88+
To populate the outbox table, I used the `save_model` method of admin view:
89+
90+
@transaction.atomic
91+
def save_model(self, request, obj, form, change):
92+
super().save_model(request, obj, form, change)
93+
self.create_outbox_record(obj)
94+
95+
def create_outbox_record(self, obj):
96+
ts = Timestamp()
97+
ts.FromDatetime(obj.pub_date)
98+
proto = QuestionProto(
99+
id=obj.id,
100+
question_text=obj.question_text,
101+
pub_date=ts,
102+
)
103+
outbox = Outbox(
104+
aggregatetype='question',
105+
aggregateid=obj.id,
106+
event_type='question_created',
107+
payload=proto.SerializeToString(),
108+
)
109+
outbox.save()
110+
#outbox.delete()

docker-compose.yml

+13
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,16 @@ services:
6060
- kafka
6161
- zookeeper
6262
- mysql
63+
kafdrop:
64+
container_name: cdc-kafdrop
65+
image: obsidiandynamics/kafdrop:3.27.0
66+
volumes:
67+
- ./proto:/var/protobuf_desc
68+
command: /kafdrop.sh --message.format=PROTOBUF --protobufdesc.directory=/var/protobuf_desc
69+
environment:
70+
KAFKA_BROKERCONNECT: kafka:9092
71+
ports:
72+
- 9000:9000
73+
depends_on:
74+
- kafka
75+
- zookeeper

proto/output/csharp/Question.cs

+19
Original file line numberDiff line numberDiff line change
@@ -46,26 +46,31 @@ public sealed partial class Question : pb::IMessage<Question>
4646
private static readonly pb::MessageParser<Question> _parser = new pb::MessageParser<Question>(() => new Question());
4747
private pb::UnknownFieldSet _unknownFields;
4848
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
49+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
4950
public static pb::MessageParser<Question> Parser { get { return _parser; } }
5051

5152
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
53+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
5254
public static pbr::MessageDescriptor Descriptor {
5355
get { return global::Com.Hus.Cdc.QuestionReflection.Descriptor.MessageTypes[0]; }
5456
}
5557

5658
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
59+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
5760
pbr::MessageDescriptor pb::IMessage.Descriptor {
5861
get { return Descriptor; }
5962
}
6063

6164
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
65+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
6266
public Question() {
6367
OnConstruction();
6468
}
6569

6670
partial void OnConstruction();
6771

6872
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
73+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
6974
public Question(Question other) : this() {
7075
id_ = other.id_;
7176
questionText_ = other.questionText_;
@@ -74,6 +79,7 @@ public Question(Question other) : this() {
7479
}
7580

7681
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
82+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
7783
public Question Clone() {
7884
return new Question(this);
7985
}
@@ -82,6 +88,7 @@ public Question Clone() {
8288
public const int IdFieldNumber = 1;
8389
private int id_;
8490
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
91+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
8592
public int Id {
8693
get { return id_; }
8794
set {
@@ -93,6 +100,7 @@ public int Id {
93100
public const int QuestionTextFieldNumber = 2;
94101
private string questionText_ = "";
95102
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
103+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
96104
public string QuestionText {
97105
get { return questionText_; }
98106
set {
@@ -104,6 +112,7 @@ public string QuestionText {
104112
public const int PubDateFieldNumber = 3;
105113
private global::Google.Protobuf.WellKnownTypes.Timestamp pubDate_;
106114
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
115+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
107116
public global::Google.Protobuf.WellKnownTypes.Timestamp PubDate {
108117
get { return pubDate_; }
109118
set {
@@ -112,11 +121,13 @@ public string QuestionText {
112121
}
113122

114123
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
124+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
115125
public override bool Equals(object other) {
116126
return Equals(other as Question);
117127
}
118128

119129
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
130+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
120131
public bool Equals(Question other) {
121132
if (ReferenceEquals(other, null)) {
122133
return false;
@@ -131,6 +142,7 @@ public bool Equals(Question other) {
131142
}
132143

133144
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
145+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
134146
public override int GetHashCode() {
135147
int hash = 1;
136148
if (Id != 0) hash ^= Id.GetHashCode();
@@ -143,11 +155,13 @@ public override int GetHashCode() {
143155
}
144156

145157
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
158+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
146159
public override string ToString() {
147160
return pb::JsonFormatter.ToDiagnosticString(this);
148161
}
149162

150163
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
164+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
151165
public void WriteTo(pb::CodedOutputStream output) {
152166
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
153167
output.WriteRawMessage(this);
@@ -172,6 +186,7 @@ public void WriteTo(pb::CodedOutputStream output) {
172186

173187
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
174188
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
189+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
175190
void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) {
176191
if (Id != 0) {
177192
output.WriteRawTag(8);
@@ -192,6 +207,7 @@ public void WriteTo(pb::CodedOutputStream output) {
192207
#endif
193208

194209
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
210+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
195211
public int CalculateSize() {
196212
int size = 0;
197213
if (Id != 0) {
@@ -210,6 +226,7 @@ public int CalculateSize() {
210226
}
211227

212228
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
229+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
213230
public void MergeFrom(Question other) {
214231
if (other == null) {
215232
return;
@@ -230,6 +247,7 @@ public void MergeFrom(Question other) {
230247
}
231248

232249
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
250+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
233251
public void MergeFrom(pb::CodedInputStream input) {
234252
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
235253
input.ReadRawMessage(this);
@@ -262,6 +280,7 @@ public void MergeFrom(pb::CodedInputStream input) {
262280

263281
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
264282
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
283+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
265284
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
266285
uint tag;
267286
while ((tag = input.ReadTag()) != 0) {

proto/question.desc

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
�
3+
question.proto com.hus.cdcgoogle/protobuf/timestamp.proto"v
4+
Question
5+
id (Rid#
6+
question_text ( R questionText5
7+
pub_date ( 2.google.protobuf.TimestampRpubDatebproto3

0 commit comments

Comments
 (0)