Skip to content

Commit

Permalink
Merge pull request #3 from farbodahm/feature/add-protobuf-consumer-sa…
Browse files Browse the repository at this point in the history
…mple

feat: add sample Protobuf consumer
  • Loading branch information
farbodahm authored Apr 15, 2023
2 parents f1ad111 + 9600cde commit f4ef675
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
54 changes: 54 additions & 0 deletions cosumer/consume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import argparse

# Protobuf generated class; resides at ./protobuf/user_pb2.py
from model import twitter_pb2
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer


def main(args: argparse.Namespace) -> None:
topic = args.topic

protobuf_deserializer = ProtobufDeserializer(twitter_pb2.Tweet,
{'use.deprecated.format': False})

consumer_conf = {'bootstrap.servers': args.bootstrap_servers,
'group.id': args.group,
'auto.offset.reset': "earliest",
'enable.auto.commit': False, }

consumer = Consumer(consumer_conf)
consumer.subscribe([topic])

while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue

tweet = protobuf_deserializer(
msg.value(), SerializationContext(topic, MessageField.VALUE))

if tweet is not None:
print(tweet)
except KeyboardInterrupt:
break

consumer.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="ProtobufDeserializer example")
parser.add_argument('-b', dest="bootstrap_servers", required=True,
help="Bootstrap broker(s) (host[:port])")
parser.add_argument('-s', dest="schema_registry", required=True,
help="Schema Registry (http(s)://host[:port]")
parser.add_argument('-t', dest="topic", default="example_serde_protobuf",
help="Topic name")
parser.add_argument('-g', dest="group", default="example_serde_protobuf",
help="Consumer group")

main(parser.parse_args())
1 change: 0 additions & 1 deletion producer/requirements.txt

This file was deleted.

0 comments on commit f4ef675

Please sign in to comment.