77from pydantic import ValidationError
88from confluent_kafka import DeserializingConsumer , KafkaException
99from confluent_kafka .serialization import StringDeserializer
10- from confluent_kafka .admin import AdminClient , NewTopic
1110from autoreduce_utils .clients .connection_exception import ConnectionException
1211from autoreduce_utils .message .message import Message
1312from autoreduce_utils .clients .producer import Publisher
13+ from autoreduce_utils .clients .kafka_utils import kafka_config_from_env
1414from autoreduce_qp .queue_processor .handle_message import HandleMessage
1515
1616TRANSACTIONS_TOPIC = os .getenv ('KAFKA_TOPIC' )
1717KAFKA_BROKER_URL = os .getenv ("KAFKA_BROKER_URL" )
18- GROUP_ID = 'mygroup '
18+ GROUP_ID = 'data_ready-group '
1919
2020
2121class Consumer (threading .Thread ):
@@ -26,16 +26,6 @@ def __init__(self, consumer=None):
2626 self .logger = logging .getLogger (__package__ )
2727 self .logger .debug ("Initializing the consumer" )
2828
29- kafka_broker = {'bootstrap.servers' : KAFKA_BROKER_URL }
30- admin_client = AdminClient (kafka_broker )
31- topics = admin_client .list_topics ().topics
32-
33- if not topics :
34- # Create the topic
35- self .logger .info ("Creating the topic '%s'" , TRANSACTIONS_TOPIC )
36- new_topic = NewTopic (TRANSACTIONS_TOPIC , num_partitions = 1 , replication_factor = 1 )
37- admin_client .create_topics ([new_topic ])
38-
3929 self .consumer = consumer
4030 self .message_handler = HandleMessage ()
4131 self ._stop_event = threading .Event ()
@@ -49,14 +39,13 @@ def __init__(self, consumer=None):
4939 try :
5040 self .logger .debug ("Getting the kafka consumer" )
5141
52- config = {
53- 'bootstrap.servers' : KAFKA_BROKER_URL ,
54- 'group.id' : GROUP_ID ,
55- 'auto.offset.reset' : "earliest" ,
56- "on_commit" : self .on_commit ,
57- 'key.deserializer' : StringDeserializer ('utf_8' ),
58- 'value.deserializer' : StringDeserializer ('utf_8' )
59- }
42+ config = kafka_config_from_env ()
43+
44+ config ['key.deserializer' ] = StringDeserializer ('utf_8' )
45+ config ['value.deserializer' ] = StringDeserializer ('utf_8' )
46+ config ['on_commit' ] = self .on_commit
47+ config ['group.id' ] = GROUP_ID
48+ config ['auto.offset.reset' ] = 'earliest'
6049 self .consumer = DeserializingConsumer (config )
6150 except KafkaException as err :
6251 self .logger .error ("Could not initialize the consumer: %s" , err )
0 commit comments