-
Notifications
You must be signed in to change notification settings - Fork 926
Closed
Description
Description
>>> confluent_kafka.version()
('0.11.0', 720896)
>>> confluent_kafka.libversion()
('0.11.0', 721151)
config={'bootstrap.servers': KAFKA_BROKER, \
'group.id': FROM_TOPIC, \
'enable.auto.commit': False,\
'default.topic.config': {'auto.offset.reset': 'latest'}}
def launchConsumer(i):
consumer = Consumer(**conf)
topic = TopicPartition(FROM_TOPIC,i)
print "Topic offset",topic.offset
consumer.assign([topic])
print "Getting Position"
position = consumer.position([topic])
print "Partition", i, ", Position: ", position
hasData = True
try:
while hasData:
print "Running..."
message = consumer.poll()
if message:
logger.debug("Consumer: Data Retrieved")
logger.debug(message.value())
print "================================================"
print dumps(loads(message.value()), indent=4)
msg = loads(message.value())
consumer.commit()
print "================================================"
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
logger.debug('%% Aborted by user\n')
consumer.close()
except Exception as e:
print e
consumer.close()
sys.exit(1)
consumer.close()
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): - Apache Kafka broker version:
- Client configuration:
{...}
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue
Metadata
Metadata
Assignees
Labels
No labels