diff --git a/model/twitter.proto b/model/twitter.proto index aa5eb32..f51742e 100644 --- a/model/twitter.proto +++ b/model/twitter.proto @@ -6,7 +6,7 @@ import "google/protobuf/timestamp.proto"; message Tweet { - string tweet_id = 1; + string id = 1; string user_id = 2; string text = 3; google.protobuf.Timestamp tweeted_date = 4; @@ -27,9 +27,10 @@ message User { } message TweetLike { - string tweet_id = 1; - string user_id = 2; - google.protobuf.Timestamp created_date = 3; + string id = 1; + string tweet_id = 2; + string user_id = 3; + google.protobuf.Timestamp liked_date = 4; } message Comment { @@ -41,9 +42,10 @@ message Comment { } message UserFollow { + string id = 1; // User who is followed - string followed_id = 1; + string followed_id = 2; // User who is following - string follower_id = 2; - google.protobuf.Timestamp followed_date = 3; + string follower_id = 3; + google.protobuf.Timestamp followed_date = 4; } diff --git a/model/twitter_pb2.py b/model/twitter_pb2.py index ab1653b..d16573f 100644 --- a/model/twitter_pb2.py +++ b/model/twitter_pb2.py @@ -14,7 +14,7 @@ from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13model/twitter.proto\x12\x07twitter\x1a\x1fgoogle/protobuf/timestamp.proto\"j\n\x05Tweet\x12\x10\n\x08tweet_id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x30\n\x0ctweeted_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc0\x01\n\x04User\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\nfirst_name\x18\x02 \x01(\t\x12\x11\n\tlast_name\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\x12$\n\x06gender\x18\x05 \x01(\x0e\x32\x14.twitter.User.Gender\x12\x30\n\x0c\x63reated_date\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1e\n\x06Gender\x12\n\n\x06\x46\x45MALE\x10\x00\x12\x08\n\x04MALE\x10\x01\"`\n\tTweetLike\x12\x10\n\x08tweet_id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x30\n\x0c\x63reated_date\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"z\n\x07\x43omment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08tweet_id\x18\x02 \x01(\t\x12\x0f\n\x07user_id\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x32\n\x0e\x63ommented_date\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"i\n\nUserFollow\x12\x13\n\x0b\x66ollowed_id\x18\x01 \x01(\t\x12\x13\n\x0b\x66ollower_id\x18\x02 \x01(\t\x12\x31\n\rfollowed_date\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13model/twitter.proto\x12\x07twitter\x1a\x1fgoogle/protobuf/timestamp.proto\"d\n\x05Tweet\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07user_id\x18\x02 \x01(\t\x12\x0c\n\x04text\x18\x03 \x01(\t\x12\x30\n\x0ctweeted_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\xc0\x01\n\x04User\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\nfirst_name\x18\x02 \x01(\t\x12\x11\n\tlast_name\x18\x03 \x01(\t\x12\r\n\x05\x65mail\x18\x04 \x01(\t\x12$\n\x06gender\x18\x05 \x01(\x0e\x32\x14.twitter.User.Gender\x12\x30\n\x0c\x63reated_date\x18\x06 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x1e\n\x06Gender\x12\n\n\x06\x46\x45MALE\x10\x00\x12\x08\n\x04MALE\x10\x01\"j\n\tTweetLike\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08tweet_id\x18\x02 \x01(\t\x12\x0f\n\x07user_id\x18\x03 \x01(\t\x12.\n\nliked_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"z\n\x07\x43omment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08tweet_id\x18\x02 \x01(\t\x12\x0f\n\x07user_id\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x32\n\x0e\x63ommented_date\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"u\n\nUserFollow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x13\n\x0b\x66ollowed_id\x18\x02 \x01(\t\x12\x13\n\x0b\x66ollower_id\x18\x03 \x01(\t\x12\x31\n\rfollowed_date\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestampb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -23,15 +23,15 @@ DESCRIPTOR._options = None _globals['_TWEET']._serialized_start=65 - _globals['_TWEET']._serialized_end=171 - _globals['_USER']._serialized_start=174 - _globals['_USER']._serialized_end=366 - _globals['_USER_GENDER']._serialized_start=336 - _globals['_USER_GENDER']._serialized_end=366 - _globals['_TWEETLIKE']._serialized_start=368 - _globals['_TWEETLIKE']._serialized_end=464 - _globals['_COMMENT']._serialized_start=466 - _globals['_COMMENT']._serialized_end=588 - _globals['_USERFOLLOW']._serialized_start=590 - _globals['_USERFOLLOW']._serialized_end=695 + _globals['_TWEET']._serialized_end=165 + _globals['_USER']._serialized_start=168 + _globals['_USER']._serialized_end=360 + _globals['_USER_GENDER']._serialized_start=330 + _globals['_USER_GENDER']._serialized_end=360 + _globals['_TWEETLIKE']._serialized_start=362 + _globals['_TWEETLIKE']._serialized_end=468 + _globals['_COMMENT']._serialized_start=470 + _globals['_COMMENT']._serialized_end=592 + _globals['_USERFOLLOW']._serialized_start=594 + _globals['_USERFOLLOW']._serialized_end=711 # @@protoc_insertion_point(module_scope) diff --git a/producer/config.py b/producer/config.py new file mode 100644 index 0000000..ced04ff --- /dev/null +++ b/producer/config.py @@ -0,0 +1,69 @@ +from dataclasses import dataclass +import argparse + +from confluent_kafka import Producer +from confluent_kafka.schema_registry import SchemaRegistryClient + + +@dataclass +class Topics: + TweetsTopic: str = "Model.Tweets.1" + UsersTopic: str = "Model.Users.1" + CommentsTopic: str = "Model.Comments.1" + TweetLikesTopic: str = "Model.TweetLikes.1" + UserFollowsTopic: str = "Model.UserFollows.1" + + +# Dictionary of topics and their producing probability. +TOPICS_TO_PRODUCING_PROBABILITY = { + Topics.TweetsTopic: 0.3, + Topics.UsersTopic: 0.2, + Topics.CommentsTopic: 0.2, + Topics.TweetLikesTopic: 0.1, + Topics.UserFollowsTopic: 0.1, +} + + +class ClientGenerator: + """Class for generating required objects based on given CLI configs.""" + + def __init__(self, args: argparse.Namespace) -> None: + self.schema_registry_client = self._get_schema_registry_client( + url=args.schema_registry_url) + + self.producer = self._get_producer_client( + bootstrap_servers=args.kafka_bootstrap_servers,) + + def _get_schema_registry_client(self, url: str) -> SchemaRegistryClient: + """Create and return schema registry client.""" + schema_registry_conf = {'url': url, } + client = SchemaRegistryClient(conf=schema_registry_conf) + + return client + + def _get_producer_client(self, bootstrap_servers: str) -> Producer: + """Create and return Kafka producer client.""" + producer_conf = {'bootstrap.servers': bootstrap_servers, + 'receive.message.max.bytes': 1500000000, + } + producer = Producer(producer_conf) + + return producer + + +class CliArgsParser: + """Class for generating required ArgParse arguments """ + + def __init__(self) -> None: + self.parser = argparse.ArgumentParser( + description="Service for generating fake Twitter data in Kafka topics." + ) + + self._add_arguments() + + def _add_arguments(self) -> None: + """Add arguments that parser needs to parse.""" + self.parser.add_argument('-b', dest="kafka_bootstrap_servers", required=True, + help="Bootstrap broker(s) (host[:port])") + self.parser.add_argument('-s', dest="schema_registry_url", required=True, + help="Schema Registry (http(s)://host[:port]") diff --git a/producer/exceptions.py b/producer/exceptions.py index cfb1354..58b0556 100644 --- a/producer/exceptions.py +++ b/producer/exceptions.py @@ -1,4 +1,12 @@ -class UserNotFoundError(ValueError): +class NotFoundError(ValueError): + """Base class fot rasing when there aren't any resource found""" + + def __init__(self, message: str): + self.message = message + super().__init__(message) + + +class UserNotFoundError(NotFoundError): """Raise when no user is found""" def __init__(self, message: str): @@ -6,9 +14,25 @@ def __init__(self, message: str): super().__init__(message) -class TweetNotFoundError(ValueError): +class TweetNotFoundError(NotFoundError): """Raise when no tweet is found""" def __init__(self, message: str): self.message = message super().__init__(message) + + +class ProtobufSerializerNotFoundError(NotFoundError): + """Raise when no serializer is found""" + + def __init__(self, message: str): + self.message = message + super().__init__(message) + + +class ModelGeneratorFunctionNotFoundError(NotFoundError): + """Raise when no model generator function is found""" + + def __init__(self, message: str): + self.message = message + super().__init__(message) diff --git a/producer/main.py b/producer/main.py new file mode 100644 index 0000000..63a97e3 --- /dev/null +++ b/producer/main.py @@ -0,0 +1,54 @@ +import random +from time import sleep + +from config import CliArgsParser, ClientGenerator, TOPICS_TO_PRODUCING_PROBABILITY +from twitter_model_producer import FakeDataProducer +from exceptions import NotFoundError +from logger import logging + +TOPICS = [ + topic for topic in TOPICS_TO_PRODUCING_PROBABILITY.keys() +] +PROBABILITIES = [ + probability for probability in TOPICS_TO_PRODUCING_PROBABILITY.values() +] + + +def get_next_topic() -> str: + """Returns next topic name to produce data based on given """ + topic = random.choices(TOPICS, weights=PROBABILITIES)[0] + return topic + + +def generate_fake_data(producer: FakeDataProducer) -> None: + """Main unlimited loop for generating fake data""" + while True: + topic = get_next_topic() + logging.info(f"Producing data to topic: {topic}") + try: + producer.produce_to_topic(topic=topic) + except NotFoundError as e: + # Pass the not found exceptions as in the next call, resource may be created + logging.error(e) + + sleep(2) + + # TODO: Gracefully kill the application + # producer.producer.flush() + + +def main() -> None: + """Starting point of the producer system""" + cli_args_parser = CliArgsParser() + cli_args = cli_args_parser.parser.parse_args() + + clients = ClientGenerator(cli_args) + producer = FakeDataProducer( + producer=clients.producer, schema_registry_client=clients.schema_registry_client + ) + + generate_fake_data(producer=producer) + + +if __name__ == "__main__": + main() diff --git a/producer/model_faker.py b/producer/model_faker.py index b6881e7..2af3fcc 100644 --- a/producer/model_faker.py +++ b/producer/model_faker.py @@ -11,7 +11,7 @@ class FakeDataModel: - """Generate fake model to further produce them in Kafka topics.""" + """Generate fake models to further produce them in Kafka topics.""" ID_MAX_INT = 2147483647 def __init__(self) -> None: @@ -33,7 +33,7 @@ def generate_tweet_model(self) -> twitter_pb2.Tweet: raise UserNotFoundError("There aren't any users created") tweet = twitter_pb2.Tweet( - tweet_id=self._generate_new_tweet_id(), + id=self._generate_new_tweet_id(), user_id=random.choice(self._generated_user_ids), text=self._faker.text(), ) @@ -71,6 +71,8 @@ def generate_tweetlike_model(self) -> twitter_pb2.TweetLike: raise TweetNotFoundError("There aren't any tweets created") tweetlike = twitter_pb2.TweetLike( + id=str(self._faker.unique.random_int( + max=FakeDataModel.ID_MAX_INT)), tweet_id=random.choice(self._generated_tweet_ids), user_id=random.choice(self._generated_user_ids), ) @@ -106,7 +108,7 @@ def generate_comment_model(self) -> twitter_pb2.Comment: def generate_userfollow_model(self) -> twitter_pb2.UserFollow: """Return a new generated fake UserFollow model. This class, models a User following another User.""" - if len(self._generated_user_ids) > 2: + if len(self._generated_user_ids) < 2: logging.error( "You need more than 2 users to model a follow. " "First call creating User model 2 times.") @@ -120,6 +122,8 @@ def generate_userfollow_model(self) -> twitter_pb2.UserFollow: follower_id = random.choice(self._generated_user_ids) userfollow = twitter_pb2.UserFollow( + id=str(self._faker.unique.random_int( + max=FakeDataModel.ID_MAX_INT)), followed_id=followed_id, follower_id=follower_id, ) diff --git a/producer/produce.py b/producer/produce.py deleted file mode 100644 index eb826d4..0000000 --- a/producer/produce.py +++ /dev/null @@ -1,77 +0,0 @@ -from model import twitter_pb2 -import datetime -import argparse -from confluent_kafka import Producer -from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField -from confluent_kafka.schema_registry import SchemaRegistryClient -from confluent_kafka.schema_registry.protobuf import ProtobufSerializer -from time import sleep - -mesg = twitter_pb2.Tweet(text="Happy", user_id="farbod", tweet_id="12") -mesg.tweeted_date.FromDatetime(datetime.datetime.now()) - -print(mesg) - - -def delivery_report(err, msg): - """ - Reports the failure or success of a message delivery. - Args: - err (KafkaError): The error that occurred on None on success. - msg (Message): The message that was produced or failed. - """ - - if err is not None: - print("Delivery failed for User record {}: {}".format(msg.key(), err)) - return - print('User record {} successfully produced to {} [{}] at offset {}'.format( - msg.key(), msg.topic(), msg.partition(), msg.offset())) - - -def main(args: argparse.Namespace) -> None: - """Main Function""" - topic = args.topic - - schema_registry_conf = {'url': args.schema_registry} - schema_registry_client = SchemaRegistryClient(schema_registry_conf) - - string_serializer = StringSerializer('utf8') - protobuf_serializer = ProtobufSerializer(twitter_pb2.Tweet, - schema_registry_client, - {'use.deprecated.format': False}) - - producer_conf = {'bootstrap.servers': args.bootstrap_servers, - 'receive.message.max.bytes': 1500000000, - } - - producer = Producer(producer_conf) - - print("Producing user records to topic {}. ^C to exit.".format(topic)) - - # Serve on_delivery callbacks from previous calls to produce() - producer.poll(0.0) - try: - producer.produce(topic=topic, partition=0, - key=string_serializer("test2"), - value=protobuf_serializer( - mesg, SerializationContext(topic, MessageField.VALUE)), - on_delivery=delivery_report) - sleep(2) - except Exception as e: - print("Shit", e) - raise e - - print("\nFlushing records...") - producer.flush() - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description="ProtobufSerializer example") - parser.add_argument('-b', dest="bootstrap_servers", required=True, - help="Bootstrap broker(s) (host[:port])") - parser.add_argument('-s', dest="schema_registry", required=True, - help="Schema Registry (http(s)://host[:port]") - parser.add_argument('-t', dest="topic", default="example_serde_protobuf", - help="Topic name") - - main(parser.parse_args()) diff --git a/producer/twitter_model_producer.py b/producer/twitter_model_producer.py new file mode 100644 index 0000000..b9d5a76 --- /dev/null +++ b/producer/twitter_model_producer.py @@ -0,0 +1,104 @@ +from typing import Dict, Callable + + +from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField +from confluent_kafka.schema_registry.protobuf import ProtobufSerializer +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka import Producer, KafkaError, Message as KafkaMessage +from google.protobuf.message import Message + +from model_faker import FakeDataModel +from model import twitter_pb2 +from config import Topics +from exceptions import ModelGeneratorFunctionNotFoundError, ProtobufSerializerNotFoundError +from logger import logging + + +class FakeDataProducer: + """Main class for generating next model and producing it in Kafka.""" + + def __init__(self, producer: Producer, schema_registry_client: SchemaRegistryClient) -> None: + self.faker = FakeDataModel() + self.producer = producer + self.schema_registry_client = schema_registry_client + + # Serializers + self.string_serializer = StringSerializer('utf8') + self.protobuf_serializers = self._get_serializers( + schema_registry_client=self.schema_registry_client + ) + + self.topics_to_model_generators = self._get_topics_to_model_genarators() + + def produce(self, topic: str, key: str, msg: Message) -> None: + """Produce given model to Kafka""" + protobuf_serializer = self.protobuf_serializers.get(topic, None) + if protobuf_serializer is None: + raise ProtobufSerializerNotFoundError( + f'No serializer found for topic: {topic}') + + self.producer.produce(topic=topic, partition=0, + key=self.string_serializer(key), + value=protobuf_serializer( + msg, SerializationContext(topic, MessageField.VALUE)), + on_delivery=FakeDataProducer._delivery_report,) + + def produce_to_topic(self, topic: str) -> None: + """Produce a fake generated model to the given topic""" + model_generator_func = self.topics_to_model_generators.get(topic, None) + if model_generator_func is None: + raise ModelGeneratorFunctionNotFoundError( + f'No model generator found for topic: {topic}') + + generated_model = model_generator_func() + self.produce(topic=topic, key=generated_model.id, msg=generated_model) + + def _get_topics_to_model_genarators(self) -> Dict[str, Callable]: + """Map each topic to its relatated model generator function.""" + result: Dict[str, Callable] = { + Topics.TweetsTopic: self.faker.generate_tweet_model, + Topics.UsersTopic: self.faker.generate_user_model, + Topics.CommentsTopic: self.faker.generate_comment_model, + Topics.TweetLikesTopic: self.faker.generate_tweetlike_model, + Topics.UserFollowsTopic: self.faker.generate_userfollow_model, + } + + return result + + def _get_serializers(self, schema_registry_client: SchemaRegistryClient) -> Dict[str, ProtobufSerializer]: + """Map each topic to its Protobuf Serializer.""" + + serializers: Dict[str, ProtobufSerializer] = { + Topics.TweetsTopic: ProtobufSerializer(twitter_pb2.Tweet, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.UsersTopic: ProtobufSerializer(twitter_pb2.User, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.CommentsTopic: ProtobufSerializer(twitter_pb2.Comment, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.TweetLikesTopic: ProtobufSerializer(twitter_pb2.TweetLike, + schema_registry_client, + {'use.deprecated.format': False}), + Topics.UserFollowsTopic: ProtobufSerializer(twitter_pb2.UserFollow, + schema_registry_client, + {'use.deprecated.format': False}), + } + + return serializers + + @staticmethod + def _delivery_report(err: KafkaError, msg: KafkaMessage) -> None: + """ + Reports the failure or success of a message delivery. + Args: + err (KafkaError): The error that occurred (None on success). + msg (Message): The message that was produced or failed. + """ + if err is not None: + logging.error( + f'Delivery failed to topic={msg.topic()}, ' + f'partition={msg.partition()}, ' + f'offset={msg.offset()} for Message with id={msg.key()}: {err}', + )