Skip to content

Memory Leak from Consumer in Library #307

@rohitgcs

Description

@rohitgcs

Description

>>> confluent_kafka.version()
('0.11.0', 720896)
>>> confluent_kafka.libversion()
('0.11.0', 721151)

config={'bootstrap.servers': KAFKA_BROKER, \
	'group.id': FROM_TOPIC, \
	'enable.auto.commit': False,\
	'default.topic.config': {'auto.offset.reset': 'latest'}}

def launchConsumer(i):
	consumer = Consumer(**conf)
	topic = TopicPartition(FROM_TOPIC,i)
	print "Topic offset",topic.offset
	consumer.assign([topic])
	print "Getting Position"
	position = consumer.position([topic])
	print "Partition", i, ", Position: ", position
	hasData = True
	try:
		while hasData:
			print "Running..."
			message = consumer.poll()
			if message:
				logger.debug("Consumer: Data Retrieved")
				logger.debug(message.value())
				print "================================================"
				print dumps(loads(message.value()), indent=4)
				msg = loads(message.value())
				consumer.commit()
				print "================================================"
	except KeyboardInterrupt:
		sys.stderr.write('%% Aborted by user\n')
		logger.debug('%% Aborted by user\n')
		consumer.close()
	except Exception as e:
		print e
		consumer.close()
		sys.exit(1)
	consumer.close()

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions