-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
As of now, we are pushing to Kafka via an infinite loop, which might cause problems in the future.
We should consider defining "number of tries" after which exception will be raised.
Lines 24 to 39 in dfe8d5e
def produce(self, record): | |
while True: | |
try: | |
self._producer.produce( | |
topic=self._topic, | |
key=record.key_to_avro_dict(), | |
value=record.value_to_avro_dict(), | |
on_delivery=self._delivery_report | |
) | |
self._producer.poll(0) | |
break | |
except BufferError as e: | |
print( | |
f'Failed to send on attempt {record}. ' | |
f'Error received {str(e)}') | |
self._producer.poll(1) |
Metadata
Metadata
Assignees
Labels
No labels