Skip to content

Commit

Permalink
Merge pull request #5 from farbodahm/feature/complete-producer
Browse files Browse the repository at this point in the history
Completing Producer
  • Loading branch information
farbodahm authored Apr 23, 2023
2 parents a14794b + 413cda3 commit 8cdba40
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 101 deletions.
16 changes: 9 additions & 7 deletions model/twitter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "google/protobuf/timestamp.proto";


message Tweet {
string tweet_id = 1;
string id = 1;
string user_id = 2;
string text = 3;
google.protobuf.Timestamp tweeted_date = 4;
Expand All @@ -27,9 +27,10 @@ message User {
}

message TweetLike {
string tweet_id = 1;
string user_id = 2;
google.protobuf.Timestamp created_date = 3;
string id = 1;
string tweet_id = 2;
string user_id = 3;
google.protobuf.Timestamp liked_date = 4;
}

message Comment {
Expand All @@ -41,9 +42,10 @@ message Comment {
}

message UserFollow {
string id = 1;
// User who is followed
string followed_id = 1;
string followed_id = 2;
// User who is following
string follower_id = 2;
google.protobuf.Timestamp followed_date = 3;
string follower_id = 3;
google.protobuf.Timestamp followed_date = 4;
}
24 changes: 12 additions & 12 deletions model/twitter_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions producer/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from dataclasses import dataclass
import argparse

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient


@dataclass
class Topics:
TweetsTopic: str = "Model.Tweets.1"
UsersTopic: str = "Model.Users.1"
CommentsTopic: str = "Model.Comments.1"
TweetLikesTopic: str = "Model.TweetLikes.1"
UserFollowsTopic: str = "Model.UserFollows.1"


# Dictionary of topics and their producing probability.
TOPICS_TO_PRODUCING_PROBABILITY = {
Topics.TweetsTopic: 0.3,
Topics.UsersTopic: 0.2,
Topics.CommentsTopic: 0.2,
Topics.TweetLikesTopic: 0.1,
Topics.UserFollowsTopic: 0.1,
}


class ClientGenerator:
"""Class for generating required objects based on given CLI configs."""

def __init__(self, args: argparse.Namespace) -> None:
self.schema_registry_client = self._get_schema_registry_client(
url=args.schema_registry_url)

self.producer = self._get_producer_client(
bootstrap_servers=args.kafka_bootstrap_servers,)

def _get_schema_registry_client(self, url: str) -> SchemaRegistryClient:
"""Create and return schema registry client."""
schema_registry_conf = {'url': url, }
client = SchemaRegistryClient(conf=schema_registry_conf)

return client

def _get_producer_client(self, bootstrap_servers: str) -> Producer:
"""Create and return Kafka producer client."""
producer_conf = {'bootstrap.servers': bootstrap_servers,
'receive.message.max.bytes': 1500000000,
}
producer = Producer(producer_conf)

return producer


class CliArgsParser:
"""Class for generating required ArgParse arguments """

def __init__(self) -> None:
self.parser = argparse.ArgumentParser(
description="Service for generating fake Twitter data in Kafka topics."
)

self._add_arguments()

def _add_arguments(self) -> None:
"""Add arguments that parser needs to parse."""
self.parser.add_argument('-b', dest="kafka_bootstrap_servers", required=True,
help="Bootstrap broker(s) (host[:port])")
self.parser.add_argument('-s', dest="schema_registry_url", required=True,
help="Schema Registry (http(s)://host[:port]")
28 changes: 26 additions & 2 deletions producer/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
class UserNotFoundError(ValueError):
class NotFoundError(ValueError):
"""Base class fot rasing when there aren't any resource found"""

def __init__(self, message: str):
self.message = message
super().__init__(message)


class UserNotFoundError(NotFoundError):
"""Raise when no user is found"""

def __init__(self, message: str):
self.message = message
super().__init__(message)


class TweetNotFoundError(ValueError):
class TweetNotFoundError(NotFoundError):
"""Raise when no tweet is found"""

def __init__(self, message: str):
self.message = message
super().__init__(message)


class ProtobufSerializerNotFoundError(NotFoundError):
"""Raise when no serializer is found"""

def __init__(self, message: str):
self.message = message
super().__init__(message)


class ModelGeneratorFunctionNotFoundError(NotFoundError):
"""Raise when no model generator function is found"""

def __init__(self, message: str):
self.message = message
super().__init__(message)
54 changes: 54 additions & 0 deletions producer/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import random
from time import sleep

from config import CliArgsParser, ClientGenerator, TOPICS_TO_PRODUCING_PROBABILITY
from twitter_model_producer import FakeDataProducer
from exceptions import NotFoundError
from logger import logging

TOPICS = [
topic for topic in TOPICS_TO_PRODUCING_PROBABILITY.keys()
]
PROBABILITIES = [
probability for probability in TOPICS_TO_PRODUCING_PROBABILITY.values()
]


def get_next_topic() -> str:
"""Returns next topic name to produce data based on given """
topic = random.choices(TOPICS, weights=PROBABILITIES)[0]
return topic


def generate_fake_data(producer: FakeDataProducer) -> None:
"""Main unlimited loop for generating fake data"""
while True:
topic = get_next_topic()
logging.info(f"Producing data to topic: {topic}")
try:
producer.produce_to_topic(topic=topic)
except NotFoundError as e:
# Pass the not found exceptions as in the next call, resource may be created
logging.error(e)

sleep(2)

# TODO: Gracefully kill the application
# producer.producer.flush()


def main() -> None:
"""Starting point of the producer system"""
cli_args_parser = CliArgsParser()
cli_args = cli_args_parser.parser.parse_args()

clients = ClientGenerator(cli_args)
producer = FakeDataProducer(
producer=clients.producer, schema_registry_client=clients.schema_registry_client
)

generate_fake_data(producer=producer)


if __name__ == "__main__":
main()
10 changes: 7 additions & 3 deletions producer/model_faker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


class FakeDataModel:
"""Generate fake model to further produce them in Kafka topics."""
"""Generate fake models to further produce them in Kafka topics."""
ID_MAX_INT = 2147483647

def __init__(self) -> None:
Expand All @@ -33,7 +33,7 @@ def generate_tweet_model(self) -> twitter_pb2.Tweet:
raise UserNotFoundError("There aren't any users created")

tweet = twitter_pb2.Tweet(
tweet_id=self._generate_new_tweet_id(),
id=self._generate_new_tweet_id(),
user_id=random.choice(self._generated_user_ids),
text=self._faker.text(),
)
Expand Down Expand Up @@ -71,6 +71,8 @@ def generate_tweetlike_model(self) -> twitter_pb2.TweetLike:
raise TweetNotFoundError("There aren't any tweets created")

tweetlike = twitter_pb2.TweetLike(
id=str(self._faker.unique.random_int(
max=FakeDataModel.ID_MAX_INT)),
tweet_id=random.choice(self._generated_tweet_ids),
user_id=random.choice(self._generated_user_ids),
)
Expand Down Expand Up @@ -106,7 +108,7 @@ def generate_comment_model(self) -> twitter_pb2.Comment:
def generate_userfollow_model(self) -> twitter_pb2.UserFollow:
"""Return a new generated fake UserFollow model.
This class, models a User following another User."""
if len(self._generated_user_ids) > 2:
if len(self._generated_user_ids) < 2:
logging.error(
"You need more than 2 users to model a follow. "
"First call creating User model 2 times.")
Expand All @@ -120,6 +122,8 @@ def generate_userfollow_model(self) -> twitter_pb2.UserFollow:
follower_id = random.choice(self._generated_user_ids)

userfollow = twitter_pb2.UserFollow(
id=str(self._faker.unique.random_int(
max=FakeDataModel.ID_MAX_INT)),
followed_id=followed_id,
follower_id=follower_id,
)
Expand Down
77 changes: 0 additions & 77 deletions producer/produce.py

This file was deleted.

Loading

0 comments on commit 8cdba40

Please sign in to comment.