Skip to content

Add num of tries while pushing to kafka #86

@vesely-david

Description

@vesely-david

As of now, we are pushing to Kafka via an infinite loop, which might cause problems in the future.
We should consider defining "number of tries" after which exception will be raised.

py2k/py2k/producer.py

Lines 24 to 39 in dfe8d5e

def produce(self, record):
while True:
try:
self._producer.produce(
topic=self._topic,
key=record.key_to_avro_dict(),
value=record.value_to_avro_dict(),
on_delivery=self._delivery_report
)
self._producer.poll(0)
break
except BufferError as e:
print(
f'Failed to send on attempt {record}. '
f'Error received {str(e)}')
self._producer.poll(1)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions