|
11 | 11 | from autoreduce_utils.clients.connection_exception import ConnectionException |
12 | 12 | from autoreduce_utils.message.message import Message |
13 | 13 | from autoreduce_utils.clients.producer import Publisher |
| 14 | +from autoreduce_utils.clients.kafka_utils import kafka_config_from_env |
14 | 15 | from autoreduce_qp.queue_processor.handle_message import HandleMessage |
15 | 16 |
|
16 | 17 | TRANSACTIONS_TOPIC = os.getenv('KAFKA_TOPIC') |
17 | 18 | KAFKA_BROKER_URL = os.getenv("KAFKA_BROKER_URL") |
18 | | -GROUP_ID = 'mygroup' |
| 19 | +GROUP_ID = 'data_ready-group' |
19 | 20 |
|
20 | 21 |
|
21 | 22 | class Consumer(threading.Thread): |
@@ -49,14 +50,12 @@ def __init__(self, consumer=None): |
49 | 50 | try: |
50 | 51 | self.logger.debug("Getting the kafka consumer") |
51 | 52 |
|
52 | | - config = { |
53 | | - 'bootstrap.servers': KAFKA_BROKER_URL, |
54 | | - 'group.id': GROUP_ID, |
55 | | - 'auto.offset.reset': "earliest", |
56 | | - "on_commit": self.on_commit, |
57 | | - 'key.deserializer': StringDeserializer('utf_8'), |
58 | | - 'value.deserializer': StringDeserializer('utf_8') |
59 | | - } |
| 53 | + config = kafka_config_from_env() |
| 54 | + |
| 55 | + config['key.deserializer'] = StringDeserializer('utf_8') |
| 56 | + config['value.deserializer'] = StringDeserializer('utf_8') |
| 57 | + config['on_commit'] = self.on_commit |
| 58 | + config['group.id'] = GROUP_ID |
60 | 59 | self.consumer = DeserializingConsumer(config) |
61 | 60 | except KafkaException as err: |
62 | 61 | self.logger.error("Could not initialize the consumer: %s", err) |
|
0 commit comments