From a193c3b3f137b7ba5dbe5e976a57cd2438bbea3c Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Wed, 19 Apr 2023 13:40:14 +0330 Subject: [PATCH 01/10] feat: add Config class --- producer/config.py | 92 +++++++++++++++++++++++++++++++++++++++++ producer/model_faker.py | 2 +- 2 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 producer/config.py diff --git a/producer/config.py b/producer/config.py new file mode 100644 index 0000000..081f3ac --- /dev/null +++ b/producer/config.py @@ -0,0 +1,92 @@ +from typing import Dict +from dataclasses import dataclass +import argparse + +from confluent_kafka import Producer +from confluent_kafka.serialization import StringSerializer +from confluent_kafka.schema_registry.protobuf import ProtobufSerializer +from confluent_kafka.schema_registry import SchemaRegistryClient + +from model import twitter_pb2 + + +@dataclass +class Topics: + TweetsTopic: str = "Model.Tweets.1" + UsersTopic: str = "Model.Users.1" + CommentsTopic: str = "Model.Comments.1" + TweetLikesTopic: str = "Model.TweetLikes.1" + UserFollowsTopic: str = "Model.UserFollows.1" + + +class ConfigGenerator: + """Class for generating required objects based on given CLI configs.""" + + def __init__(self, args: argparse.Namespace) -> None: + self.schema_registry_client = self._get_schema_registry_client( + url=args.schema_registry_url) + + self.producer = self._get_producer_client( + bootstrap_servers=args.kafka_bootstrap_servers,) + + self.protobuf_serializers = self._get_serializers( + schema_registry_client=self.schema_registry_client) + + self.string_serializer = StringSerializer('utf8') + + def _get_serializers(self, schema_registry_client: SchemaRegistryClient) -> Dict[str, ProtobufSerializer]: + """Map each topic to its Protobuf Serializer.""" + + serializers: Dict[str, ProtobufSerializer] = { + Topics.TweetsTopic: ProtobufSerializer(twitter_pb2.Tweet, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.UsersTopic: ProtobufSerializer(twitter_pb2.User, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.CommentsTopic: ProtobufSerializer(twitter_pb2.Comment, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.TweetLikesTopic: ProtobufSerializer(twitter_pb2.TweetLike, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.UserFollowsTopic: ProtobufSerializer(twitter_pb2.UserFollow, + schema_registry_client, + {'use.deprecated.format': False}), + } + + return serializers + + def _get_schema_registry_client(self, url: str) -> SchemaRegistryClient: + """Create and return schema registry client.""" + schema_registry_conf = {'url': url, } + client = SchemaRegistryClient(conf=schema_registry_conf) + + return client + + def _get_producer_client(self, bootstrap_servers: str) -> Producer: + """Create and return Kafka producer client.""" + producer_conf = {'bootstrap.servers': bootstrap_servers, + 'receive.message.max.bytes': 1500000000, + } + producer = Producer(producer_conf) + + return producer + + +class CliArgs: + """Class for generating required ArgParse arguments """ + + def __init__(self) -> None: + self.parser = argparse.ArgumentParser( + description="Service for generating fake Twitter data in Kafka topics." + ) + + self._add_arguments() + + def _add_arguments(self) -> None: + """Add arguments that parser needs to parse.""" + self.parser.add_argument('-b', dest="kafka_bootstrap_servers", required=True, + help="Bootstrap broker(s) (host[:port])") + self.parser.add_argument('-s', dest="schema_registry_url", required=True, + help="Schema Registry (http(s)://host[:port]") diff --git a/producer/model_faker.py b/producer/model_faker.py index b6881e7..e9a7c99 100644 --- a/producer/model_faker.py +++ b/producer/model_faker.py @@ -11,7 +11,7 @@ class FakeDataModel: - """Generate fake model to further produce them in Kafka topics.""" + """Generate fake models to further produce them in Kafka topics.""" ID_MAX_INT = 2147483647 def __init__(self) -> None: From a1f980c72c72fc9a2e9c02a7570a807ecefc8cec Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Wed, 19 Apr 2023 16:35:55 +0330 Subject: [PATCH 02/10] feat: add Producer class for producing data to topics --- producer/config.py | 37 +----------------- producer/producer.py | 90 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 35 deletions(-) create mode 100644 producer/producer.py diff --git a/producer/config.py b/producer/config.py index 081f3ac..07a916c 100644 --- a/producer/config.py +++ b/producer/config.py @@ -1,14 +1,9 @@ -from typing import Dict from dataclasses import dataclass import argparse from confluent_kafka import Producer -from confluent_kafka.serialization import StringSerializer -from confluent_kafka.schema_registry.protobuf import ProtobufSerializer from confluent_kafka.schema_registry import SchemaRegistryClient -from model import twitter_pb2 - @dataclass class Topics: @@ -19,7 +14,7 @@ class Topics: UserFollowsTopic: str = "Model.UserFollows.1" -class ConfigGenerator: +class ClientGenerator: """Class for generating required objects based on given CLI configs.""" def __init__(self, args: argparse.Namespace) -> None: @@ -29,34 +24,6 @@ def __init__(self, args: argparse.Namespace) -> None: self.producer = self._get_producer_client( bootstrap_servers=args.kafka_bootstrap_servers,) - self.protobuf_serializers = self._get_serializers( - schema_registry_client=self.schema_registry_client) - - self.string_serializer = StringSerializer('utf8') - - def _get_serializers(self, schema_registry_client: SchemaRegistryClient) -> Dict[str, ProtobufSerializer]: - """Map each topic to its Protobuf Serializer.""" - - serializers: Dict[str, ProtobufSerializer] = { - Topics.TweetsTopic: ProtobufSerializer(twitter_pb2.Tweet, - schema_registry_client, - {'use.deprecated.format': False}), - Topics.UsersTopic: ProtobufSerializer(twitter_pb2.User, - schema_registry_client, - {'use.deprecated.format': False}), - Topics.CommentsTopic: ProtobufSerializer(twitter_pb2.Comment, - schema_registry_client, - {'use.deprecated.format': False}), - Topics.TweetLikesTopic: ProtobufSerializer(twitter_pb2.TweetLike, - schema_registry_client, - {'use.deprecated.format': False}), - Topics.UserFollowsTopic: ProtobufSerializer(twitter_pb2.UserFollow, - schema_registry_client, - {'use.deprecated.format': False}), - } - - return serializers - def _get_schema_registry_client(self, url: str) -> SchemaRegistryClient: """Create and return schema registry client.""" schema_registry_conf = {'url': url, } @@ -74,7 +41,7 @@ def _get_producer_client(self, bootstrap_servers: str) -> Producer: return producer -class CliArgs: +class CliArgsParser: """Class for generating required ArgParse arguments """ def __init__(self) -> None: diff --git a/producer/producer.py b/producer/producer.py new file mode 100644 index 0000000..06c46d3 --- /dev/null +++ b/producer/producer.py @@ -0,0 +1,90 @@ +from typing import Dict, Callable + + +from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField +from confluent_kafka.schema_registry.protobuf import ProtobufSerializer +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka import Producer +from google.protobuf.message import Message + +from model_faker import FakeDataModel +from model import twitter_pb2 +from config import Topics + + +class FakeDataProducer: + """Main class for generating next model and producing it in Kafka.""" + + def __init__(self, producer: Producer, schema_registry_client: SchemaRegistryClient) -> None: + self.faker = FakeDataModel() + self.producer = producer + self.schema_registry_client = schema_registry_client + + # Serializers + self.string_serializer = StringSerializer('utf8') + self.protobuf_serializers = self._get_serializers( + schema_registry_client=self.schema_registry_client + ) + + def produce(self, topic: str, key: str, msg: Message) -> None: + """Produce given model to Kafka""" + protobuf_serializer = self.protobuf_serializers[topic] + + self.producer.produce(topic=topic, partition=0, + key=self.string_serializer(key), + value=protobuf_serializer( + msg, SerializationContext(topic, MessageField.VALUE)), + on_delivery=self._delivery_report) + + def produce_to_topic(self, topic: str) -> None: + """Produce given model to Kafka""" + + def get_topics_to_model_genarators(self) -> Dict[str, Callable]: + """Map each topic to its relatated model generator function.""" + result: Dict[str, Callable] = { + Topics.TweetsTopic: self.faker.generate_tweet_model, + Topics.UsersTopic: self.faker.generate_user_model, + Topics.CommentsTopic: self.faker.generate_comment_model, + Topics.TweetLikesTopic: self.faker.generate_tweetlike_model, + Topics.UserFollowsTopic: self.faker.generate_userfollow_model, + } + + return result + + def _get_serializers(self, schema_registry_client: SchemaRegistryClient) -> Dict[str, ProtobufSerializer]: + """Map each topic to its Protobuf Serializer.""" + + serializers: Dict[str, ProtobufSerializer] = { + Topics.TweetsTopic: ProtobufSerializer(twitter_pb2.Tweet, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.UsersTopic: ProtobufSerializer(twitter_pb2.User, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.CommentsTopic: ProtobufSerializer(twitter_pb2.Comment, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.TweetLikesTopic: ProtobufSerializer(twitter_pb2.TweetLike, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.UserFollowsTopic: ProtobufSerializer(twitter_pb2.UserFollow, + schema_registry_client, + {'use.deprecated.format': False}), + } + + return serializers + + @staticmethod + def _delivery_report(err, msg): + """ + Reports the failure or success of a message delivery. + Args: + err (KafkaError): The error that occurred on None on success. + msg (Message): The message that was produced or failed. + """ + + if err is not None: + print("Delivery failed for User record {}: {}".format(msg.key(), err)) + return + print('User record {} successfully produced to {} [{}] at offset {}'.format( + msg.key(), msg.topic(), msg.partition(), msg.offset())) From 1627f9ddb539dfb45314980198787e83a991d01d Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Wed, 19 Apr 2023 16:43:47 +0330 Subject: [PATCH 03/10] fix: Add id for all of the models --- model/twitter.proto | 16 +++++++++------- model/twitter_pb2.py | 22 +++++++++++----------- producer/model_faker.py | 6 +++++- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/model/twitter.proto b/model/twitter.proto index aa5eb32..657217f 100644 --- a/model/twitter.proto +++ b/model/twitter.proto @@ -6,7 +6,7 @@ import "google/protobuf/timestamp.proto"; message Tweet { - string tweet_id = 1; + string id = 1; string user_id = 2; string text = 3; google.protobuf.Timestamp tweeted_date = 4; @@ -27,9 +27,10 @@ message User { } message TweetLike { - string tweet_id = 1; - string user_id = 2; - google.protobuf.Timestamp created_date = 3; + string id = 1; + string tweet_id = 2; + string user_id = 3; + google.protobuf.Timestamp created_date = 4; } message Comment { @@ -41,9 +42,10 @@ message Comment { } message UserFollow { + string id = 1; // User who is followed - string followed_id = 1; + string followed_id = 2; // User who is following - string follower_id = 2; - google.protobuf.Timestamp followed_date = 3; + string follower_id = 3; + google.protobuf.Timestamp followed_date = 4; } diff --git a/model/twitter_pb2.py b/model/twitter_pb2.py index ab1653b..1c7c35c 100644 --- a/model/twitter_pb2.py +++ b/model/twitter_pb2.py @@ -14,7 +14,7 @@ from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13model/twitter.proto\x12\x07twitter\x1a\x1fgoogle/protobuf/timestamp.proto\"j\n\x05Tweet\x12\x10\n\x08tweet_id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x30\n\x0ctweeted_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc0\x01\n\x04User\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\nfirst_name\x18\x02 \x01(\t\x12\x11\n\tlast_name\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\x12$\n\x06gender\x18\x05 \x01(\x0e\x32\x14.twitter.User.Gender\x12\x30\n\x0c\x63reated_date\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1e\n\x06Gender\x12\n\n\x06\x46\x45MALE\x10\x00\x12\x08\n\x04MALE\x10\x01\"`\n\tTweetLike\x12\x10\n\x08tweet_id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x30\n\x0c\x63reated_date\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"z\n\x07\x43omment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08tweet_id\x18\x02 \x01(\t\x12\x0f\n\x07user_id\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x32\n\x0e\x63ommented_date\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"i\n\nUserFollow\x12\x13\n\x0b\x66ollowed_id\x18\x01 \x01(\t\x12\x13\n\x0b\x66ollower_id\x18\x02 \x01(\t\x12\x31\n\rfollowed_date\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13model/twitter.proto\x12\x07twitter\x1a\x1fgoogle/protobuf/timestamp.proto\"d\n\x05Tweet\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x30\n\x0ctweeted_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc0\x01\n\x04User\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\nfirst_name\x18\x02 \x01(\t\x12\x11\n\tlast_name\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\x12$\n\x06gender\x18\x05 \x01(\x0e\x32\x14.twitter.User.Gender\x12\x30\n\x0c\x63reated_date\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1e\n\x06Gender\x12\n\n\x06\x46\x45MALE\x10\x00\x12\x08\n\x04MALE\x10\x01\"Z\n\tTweetLike\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x30\n\x0c\x63reated_date\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"z\n\x07\x43omment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08tweet_id\x18\x02 \x01(\t\x12\x0f\n\x07user_id\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x32\n\x0e\x63ommented_date\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"u\n\nUserFollow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x13\n\x0b\x66ollowed_id\x18\x02 \x01(\t\x12\x13\n\x0b\x66ollower_id\x18\x03 \x01(\t\x12\x31\n\rfollowed_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -23,15 +23,15 @@ DESCRIPTOR._options = None _globals['_TWEET']._serialized_start=65 - _globals['_TWEET']._serialized_end=171 - _globals['_USER']._serialized_start=174 - _globals['_USER']._serialized_end=366 - _globals['_USER_GENDER']._serialized_start=336 - _globals['_USER_GENDER']._serialized_end=366 - _globals['_TWEETLIKE']._serialized_start=368 - _globals['_TWEETLIKE']._serialized_end=464 - _globals['_COMMENT']._serialized_start=466 - _globals['_COMMENT']._serialized_end=588 - _globals['_USERFOLLOW']._serialized_start=590 + _globals['_TWEET']._serialized_end=165 + _globals['_USER']._serialized_start=168 + _globals['_USER']._serialized_end=360 + _globals['_USER_GENDER']._serialized_start=330 + _globals['_USER_GENDER']._serialized_end=360 + _globals['_TWEETLIKE']._serialized_start=362 + _globals['_TWEETLIKE']._serialized_end=452 + _globals['_COMMENT']._serialized_start=454 + _globals['_COMMENT']._serialized_end=576 + _globals['_USERFOLLOW']._serialized_start=578 _globals['_USERFOLLOW']._serialized_end=695 # @@protoc_insertion_point(module_scope) diff --git a/producer/model_faker.py b/producer/model_faker.py index e9a7c99..6f39f4f 100644 --- a/producer/model_faker.py +++ b/producer/model_faker.py @@ -33,7 +33,7 @@ def generate_tweet_model(self) -> twitter_pb2.Tweet: raise UserNotFoundError("There aren't any users created") tweet = twitter_pb2.Tweet( - tweet_id=self._generate_new_tweet_id(), + id=self._generate_new_tweet_id(), user_id=random.choice(self._generated_user_ids), text=self._faker.text(), ) @@ -71,6 +71,8 @@ def generate_tweetlike_model(self) -> twitter_pb2.TweetLike: raise TweetNotFoundError("There aren't any tweets created") tweetlike = twitter_pb2.TweetLike( + id=str(self._faker.unique.random_int( + max=FakeDataModel.ID_MAX_INT)), tweet_id=random.choice(self._generated_tweet_ids), user_id=random.choice(self._generated_user_ids), ) @@ -120,6 +122,8 @@ def generate_userfollow_model(self) -> twitter_pb2.UserFollow: follower_id = random.choice(self._generated_user_ids) userfollow = twitter_pb2.UserFollow( + id=str(self._faker.unique.random_int( + max=FakeDataModel.ID_MAX_INT)), followed_id=followed_id, follower_id=follower_id, ) From 0d1783bc9886a4307ef99bd6c3ed2f07a224374d Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Sat, 22 Apr 2023 11:19:24 +0330 Subject: [PATCH 04/10] feat: add functionality for producing fake model to given topic --- producer/producer.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/producer/producer.py b/producer/producer.py index 06c46d3..3a66df5 100644 --- a/producer/producer.py +++ b/producer/producer.py @@ -26,6 +26,8 @@ def __init__(self, producer: Producer, schema_registry_client: SchemaRegistryCli schema_registry_client=self.schema_registry_client ) + self.topics_to_model_generators = self.get_topics_to_model_genarators() + def produce(self, topic: str, key: str, msg: Message) -> None: """Produce given model to Kafka""" protobuf_serializer = self.protobuf_serializers[topic] @@ -34,10 +36,12 @@ def produce(self, topic: str, key: str, msg: Message) -> None: key=self.string_serializer(key), value=protobuf_serializer( msg, SerializationContext(topic, MessageField.VALUE)), - on_delivery=self._delivery_report) + on_delivery=FakeDataProducer._delivery_report,) def produce_to_topic(self, topic: str) -> None: - """Produce given model to Kafka""" + """Produce a fake generated model to the given topic""" + generated_model = self.topics_to_model_generators[topic]() + self.produce(topic=topic, key=generated_model.id, msg=generated_model) def get_topics_to_model_genarators(self) -> Dict[str, Callable]: """Map each topic to its relatated model generator function.""" @@ -79,7 +83,7 @@ def _delivery_report(err, msg): """ Reports the failure or success of a message delivery. Args: - err (KafkaError): The error that occurred on None on success. + err (KafkaError): The error that occurred (None on success). msg (Message): The message that was produced or failed. """ From d94f803e57ceec2e10902830ad7f71e263ee2352 Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Sat, 22 Apr 2023 11:23:44 +0330 Subject: [PATCH 05/10] refactor: Raise exceptions when serializer and model generator functions are not found --- producer/exceptions.py | 16 ++++++++++++++++ producer/producer.py | 17 +++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/producer/exceptions.py b/producer/exceptions.py index cfb1354..da22721 100644 --- a/producer/exceptions.py +++ b/producer/exceptions.py @@ -12,3 +12,19 @@ class TweetNotFoundError(ValueError): def __init__(self, message: str): self.message = message super().__init__(message) + + +class ProtobufSerializerNotFoundError(ValueError): + """Raise when no serializer is found""" + + def __init__(self, message: str): + self.message = message + super().__init__(message) + + +class ModelGeneratorFunctionNotFoundError(ValueError): + """Raise when no model generator function is found""" + + def __init__(self, message: str): + self.message = message + super().__init__(message) diff --git a/producer/producer.py b/producer/producer.py index 3a66df5..6e44b97 100644 --- a/producer/producer.py +++ b/producer/producer.py @@ -10,6 +10,7 @@ from model_faker import FakeDataModel from model import twitter_pb2 from config import Topics +from exceptions import ModelGeneratorFunctionNotFoundError, ProtobufSerializerNotFoundError class FakeDataProducer: @@ -26,11 +27,14 @@ def __init__(self, producer: Producer, schema_registry_client: SchemaRegistryCli schema_registry_client=self.schema_registry_client ) - self.topics_to_model_generators = self.get_topics_to_model_genarators() + self.topics_to_model_generators = self._get_topics_to_model_genarators() def produce(self, topic: str, key: str, msg: Message) -> None: """Produce given model to Kafka""" - protobuf_serializer = self.protobuf_serializers[topic] + protobuf_serializer = self.protobuf_serializers.get(topic, None) + if protobuf_serializer is None: + raise ProtobufSerializerNotFoundError( + f'No serializer found for topic: {topic}') self.producer.produce(topic=topic, partition=0, key=self.string_serializer(key), @@ -40,10 +44,15 @@ def produce(self, topic: str, key: str, msg: Message) -> None: def produce_to_topic(self, topic: str) -> None: """Produce a fake generated model to the given topic""" - generated_model = self.topics_to_model_generators[topic]() + model_generator_func = self.topics_to_model_generators.get(topic, None) + if model_generator_func is None: + raise ModelGeneratorFunctionNotFoundError( + f'No model generator found for topic: {topic}') + + generated_model = model_generator_func() self.produce(topic=topic, key=generated_model.id, msg=generated_model) - def get_topics_to_model_genarators(self) -> Dict[str, Callable]: + def _get_topics_to_model_genarators(self) -> Dict[str, Callable]: """Map each topic to its relatated model generator function.""" result: Dict[str, Callable] = { Topics.TweetsTopic: self.faker.generate_tweet_model, From 6bc57c9161f2f97e34a292ca2d4d512fac04e9af Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Sun, 23 Apr 2023 00:22:40 +0330 Subject: [PATCH 06/10] fix: create a base NotFound exception class and use that for all of the not found esxceptions --- producer/exceptions.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/producer/exceptions.py b/producer/exceptions.py index da22721..58b0556 100644 --- a/producer/exceptions.py +++ b/producer/exceptions.py @@ -1,4 +1,12 @@ -class UserNotFoundError(ValueError): +class NotFoundError(ValueError): + """Base class fot rasing when there aren't any resource found""" + + def __init__(self, message: str): + self.message = message + super().__init__(message) + + +class UserNotFoundError(NotFoundError): """Raise when no user is found""" def __init__(self, message: str): @@ -6,7 +14,7 @@ def __init__(self, message: str): super().__init__(message) -class TweetNotFoundError(ValueError): +class TweetNotFoundError(NotFoundError): """Raise when no tweet is found""" def __init__(self, message: str): @@ -14,7 +22,7 @@ def __init__(self, message: str): super().__init__(message) -class ProtobufSerializerNotFoundError(ValueError): +class ProtobufSerializerNotFoundError(NotFoundError): """Raise when no serializer is found""" def __init__(self, message: str): @@ -22,7 +30,7 @@ def __init__(self, message: str): super().__init__(message) -class ModelGeneratorFunctionNotFoundError(ValueError): +class ModelGeneratorFunctionNotFoundError(NotFoundError): """Raise when no model generator function is found""" def __init__(self, message: str): From 27c14c3fd9e330df5b288bcb03d4b16d9ef856a7 Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Sun, 23 Apr 2023 11:04:57 +0330 Subject: [PATCH 07/10] fix: Model schemas --- model/twitter.proto | 2 +- model/twitter_pb2.py | 12 ++++++------ producer/model_faker.py | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/model/twitter.proto b/model/twitter.proto index 657217f..f51742e 100644 --- a/model/twitter.proto +++ b/model/twitter.proto @@ -30,7 +30,7 @@ message TweetLike { string id = 1; string tweet_id = 2; string user_id = 3; - google.protobuf.Timestamp created_date = 4; + google.protobuf.Timestamp liked_date = 4; } message Comment { diff --git a/model/twitter_pb2.py b/model/twitter_pb2.py index 1c7c35c..d16573f 100644 --- a/model/twitter_pb2.py +++ b/model/twitter_pb2.py @@ -14,7 +14,7 @@ from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13model/twitter.proto\x12\x07twitter\x1a\x1fgoogle/protobuf/timestamp.proto\"d\n\x05Tweet\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x30\n\x0ctweeted_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc0\x01\n\x04User\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\nfirst_name\x18\x02 \x01(\t\x12\x11\n\tlast_name\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\x12$\n\x06gender\x18\x05 \x01(\x0e\x32\x14.twitter.User.Gender\x12\x30\n\x0c\x63reated_date\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1e\n\x06Gender\x12\n\n\x06\x46\x45MALE\x10\x00\x12\x08\n\x04MALE\x10\x01\"Z\n\tTweetLike\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x30\n\x0c\x63reated_date\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"z\n\x07\x43omment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08tweet_id\x18\x02 \x01(\t\x12\x0f\n\x07user_id\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x32\n\x0e\x63ommented_date\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"u\n\nUserFollow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x13\n\x0b\x66ollowed_id\x18\x02 \x01(\t\x12\x13\n\x0b\x66ollower_id\x18\x03 \x01(\t\x12\x31\n\rfollowed_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13model/twitter.proto\x12\x07twitter\x1a\x1fgoogle/protobuf/timestamp.proto\"d\n\x05Tweet\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x30\n\x0ctweeted_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc0\x01\n\x04User\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\nfirst_name\x18\x02 \x01(\t\x12\x11\n\tlast_name\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\x12$\n\x06gender\x18\x05 \x01(\x0e\x32\x14.twitter.User.Gender\x12\x30\n\x0c\x63reated_date\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1e\n\x06Gender\x12\n\n\x06\x46\x45MALE\x10\x00\x12\x08\n\x04MALE\x10\x01\"j\n\tTweetLike\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08tweet_id\x18\x02 \x01(\t\x12\x0f\n\x07user_id\x18\x03 \x01(\t\x12.\n\nliked_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"z\n\x07\x43omment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08tweet_id\x18\x02 \x01(\t\x12\x0f\n\x07user_id\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x32\n\x0e\x63ommented_date\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"u\n\nUserFollow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x13\n\x0b\x66ollowed_id\x18\x02 \x01(\t\x12\x13\n\x0b\x66ollower_id\x18\x03 \x01(\t\x12\x31\n\rfollowed_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -29,9 +29,9 @@ _globals['_USER_GENDER']._serialized_start=330 _globals['_USER_GENDER']._serialized_end=360 _globals['_TWEETLIKE']._serialized_start=362 - _globals['_TWEETLIKE']._serialized_end=452 - _globals['_COMMENT']._serialized_start=454 - _globals['_COMMENT']._serialized_end=576 - _globals['_USERFOLLOW']._serialized_start=578 - _globals['_USERFOLLOW']._serialized_end=695 + _globals['_TWEETLIKE']._serialized_end=468 + _globals['_COMMENT']._serialized_start=470 + _globals['_COMMENT']._serialized_end=592 + _globals['_USERFOLLOW']._serialized_start=594 + _globals['_USERFOLLOW']._serialized_end=711 # @@protoc_insertion_point(module_scope) diff --git a/producer/model_faker.py b/producer/model_faker.py index 6f39f4f..2af3fcc 100644 --- a/producer/model_faker.py +++ b/producer/model_faker.py @@ -108,7 +108,7 @@ def generate_comment_model(self) -> twitter_pb2.Comment: def generate_userfollow_model(self) -> twitter_pb2.UserFollow: """Return a new generated fake UserFollow model. This class, models a User following another User.""" - if len(self._generated_user_ids) > 2: + if len(self._generated_user_ids) < 2: logging.error( "You need more than 2 users to model a follow. " "First call creating User model 2 times.") From fae6153fe823eac10ba7cae0559b6fef34428cc8 Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Sun, 23 Apr 2023 11:06:42 +0330 Subject: [PATCH 08/10] feat: add starting point of the producer system --- producer/config.py | 10 ++++ producer/main.py | 49 +++++++++++++++++++ ...{producer.py => twitter_model_producer.py} | 0 3 files changed, 59 insertions(+) create mode 100644 producer/main.py rename producer/{producer.py => twitter_model_producer.py} (100%) diff --git a/producer/config.py b/producer/config.py index 07a916c..ced04ff 100644 --- a/producer/config.py +++ b/producer/config.py @@ -14,6 +14,16 @@ class Topics: UserFollowsTopic: str = "Model.UserFollows.1" +# Dictionary of topics and their producing probability. +TOPICS_TO_PRODUCING_PROBABILITY = { + Topics.TweetsTopic: 0.3, + Topics.UsersTopic: 0.2, + Topics.CommentsTopic: 0.2, + Topics.TweetLikesTopic: 0.1, + Topics.UserFollowsTopic: 0.1, +} + + class ClientGenerator: """Class for generating required objects based on given CLI configs.""" diff --git a/producer/main.py b/producer/main.py new file mode 100644 index 0000000..32fd33e --- /dev/null +++ b/producer/main.py @@ -0,0 +1,49 @@ +import random +from time import sleep + +from config import CliArgsParser, ClientGenerator, TOPICS_TO_PRODUCING_PROBABILITY +from twitter_model_producer import FakeDataProducer +from exceptions import NotFoundError +from logger import logging + +TOPICS = [ + topic for topic in TOPICS_TO_PRODUCING_PROBABILITY.keys() +] +PROBABILITIES = [ + probability for probability in TOPICS_TO_PRODUCING_PROBABILITY.values() +] + + +def get_next_topic() -> str: + """Returns next topic name to produce data based on given """ + topic = random.choices(TOPICS, weights=PROBABILITIES)[0] + return topic + + +def generate_fake_data(producer: FakeDataProducer) -> None: + """Main unlimited loop for generating fake data""" + while True: + topic = get_next_topic() + logging.info(f"Producing data to topic: {topic}") + try: + producer.produce_to_topic(topic=topic) + except NotFoundError as e: + print(e) + sleep(2) + + +def main() -> None: + """Starting point of the producer system""" + cli_args_parser = CliArgsParser() + cli_args = cli_args_parser.parser.parse_args() + + clients = ClientGenerator(cli_args) + producer = FakeDataProducer( + producer=clients.producer, schema_registry_client=clients.schema_registry_client + ) + + generate_fake_data(producer=producer) + + +if __name__ == "__main__": + main() diff --git a/producer/producer.py b/producer/twitter_model_producer.py similarity index 100% rename from producer/producer.py rename to producer/twitter_model_producer.py From 70db37c4286d7284152e687627da23ea0f12c8f9 Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Sun, 23 Apr 2023 11:24:14 +0330 Subject: [PATCH 09/10] delete old sample producer --- producer/produce.py | 77 --------------------------------------------- 1 file changed, 77 deletions(-) delete mode 100644 producer/produce.py diff --git a/producer/produce.py b/producer/produce.py deleted file mode 100644 index eb826d4..0000000 --- a/producer/produce.py +++ /dev/null @@ -1,77 +0,0 @@ -from model import twitter_pb2 -import datetime -import argparse -from confluent_kafka import Producer -from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField -from confluent_kafka.schema_registry import SchemaRegistryClient -from confluent_kafka.schema_registry.protobuf import ProtobufSerializer -from time import sleep - -mesg = twitter_pb2.Tweet(text="Happy", user_id="farbod", tweet_id="12") -mesg.tweeted_date.FromDatetime(datetime.datetime.now()) - -print(mesg) - - -def delivery_report(err, msg): - """ - Reports the failure or success of a message delivery. - Args: - err (KafkaError): The error that occurred on None on success. - msg (Message): The message that was produced or failed. - """ - - if err is not None: - print("Delivery failed for User record {}: {}".format(msg.key(), err)) - return - print('User record {} successfully produced to {} [{}] at offset {}'.format( - msg.key(), msg.topic(), msg.partition(), msg.offset())) - - -def main(args: argparse.Namespace) -> None: - """Main Function""" - topic = args.topic - - schema_registry_conf = {'url': args.schema_registry} - schema_registry_client = SchemaRegistryClient(schema_registry_conf) - - string_serializer = StringSerializer('utf8') - protobuf_serializer = ProtobufSerializer(twitter_pb2.Tweet, - schema_registry_client, - {'use.deprecated.format': False}) - - producer_conf = {'bootstrap.servers': args.bootstrap_servers, - 'receive.message.max.bytes': 1500000000, - } - - producer = Producer(producer_conf) - - print("Producing user records to topic {}. ^C to exit.".format(topic)) - - # Serve on_delivery callbacks from previous calls to produce() - producer.poll(0.0) - try: - producer.produce(topic=topic, partition=0, - key=string_serializer("test2"), - value=protobuf_serializer( - mesg, SerializationContext(topic, MessageField.VALUE)), - on_delivery=delivery_report) - sleep(2) - except Exception as e: - print("Shit", e) - raise e - - print("\nFlushing records...") - producer.flush() - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description="ProtobufSerializer example") - parser.add_argument('-b', dest="bootstrap_servers", required=True, - help="Bootstrap broker(s) (host[:port])") - parser.add_argument('-s', dest="schema_registry", required=True, - help="Schema Registry (http(s)://host[:port]") - parser.add_argument('-t', dest="topic", default="example_serde_protobuf", - help="Topic name") - - main(parser.parse_args()) From 413cda396ce3fcba52ef8c4a42a3ff9efda551a4 Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Sun, 23 Apr 2023 13:41:16 +0330 Subject: [PATCH 10/10] refactore: make Kafka delivery method more readabale --- producer/main.py | 7 ++++++- producer/twitter_model_producer.py | 15 ++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/producer/main.py b/producer/main.py index 32fd33e..63a97e3 100644 --- a/producer/main.py +++ b/producer/main.py @@ -28,9 +28,14 @@ def generate_fake_data(producer: FakeDataProducer) -> None: try: producer.produce_to_topic(topic=topic) except NotFoundError as e: - print(e) + # Pass the not found exceptions as in the next call, resource may be created + logging.error(e) + sleep(2) + # TODO: Gracefully kill the application + # producer.producer.flush() + def main() -> None: """Starting point of the producer system""" diff --git a/producer/twitter_model_producer.py b/producer/twitter_model_producer.py index 6e44b97..b9d5a76 100644 --- a/producer/twitter_model_producer.py +++ b/producer/twitter_model_producer.py @@ -4,13 +4,14 @@ from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField from confluent_kafka.schema_registry.protobuf import ProtobufSerializer from confluent_kafka.schema_registry import SchemaRegistryClient -from confluent_kafka import Producer +from confluent_kafka import Producer, KafkaError, Message as KafkaMessage from google.protobuf.message import Message from model_faker import FakeDataModel from model import twitter_pb2 from config import Topics from exceptions import ModelGeneratorFunctionNotFoundError, ProtobufSerializerNotFoundError +from logger import logging class FakeDataProducer: @@ -88,16 +89,16 @@ def _get_serializers(self, schema_registry_client: SchemaRegistryClient) -> Dict return serializers @staticmethod - def _delivery_report(err, msg): + def _delivery_report(err: KafkaError, msg: KafkaMessage) -> None: """ Reports the failure or success of a message delivery. Args: err (KafkaError): The error that occurred (None on success). msg (Message): The message that was produced or failed. """ - if err is not None: - print("Delivery failed for User record {}: {}".format(msg.key(), err)) - return - print('User record {} successfully produced to {} [{}] at offset {}'.format( - msg.key(), msg.topic(), msg.partition(), msg.offset())) + logging.error( + f'Delivery failed to topic={msg.topic()}, ' + f'partition={msg.partition()}, ' + f'offset={msg.offset()} for Message with id={msg.key()}: {err}', + )