Description
confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, Ubuntu 14.04
I run a web app in 16 gunicorn workers (processes). Each has a producer, which works continually until the process is restarted (normally by deploy system). This instance serves ~100-250 req/s (some part of which produce to kafka) and has ~3.8gb or memory. After switching to confluent-kafka and running it for some time in production I'm observing worker RSS memory only growing and growing (it was not the case before with kafka-python). Here is a screenshot of memory monitoring graph:
The code is roughly like this (so I poll(0)
after each produce
):
def error_cb (err):
logger.error('kafka producer error_cb: {}: %s'.format(err.name()), err.str())
def on_delivery (err, msg):
assert err
logger.error('kafka producer on_delivery {}'.format(err.name()))
_producer = confluent_kafka.Producer(**{
'client.id': 'qwe_producer',
'bootstrap.servers': bootstrap_servers, #3 brokers
'log.connection.close': False,
'api.version.request': True,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.kbytes': 4000000,
'queue.buffering.max.ms': 1000,
'message.send.max.retries': 9000,
'batch.num.messages': 10000,
'delivery.report.only.error': True,
'default.topic.config': {
'request.required.acks': -1,
},
'error_cb': error_cb,
'on_delivery': on_delivery,
})
atexit.register(_producer.flush)
def kafka_async_send (topic, value):
_producer.produce(topic, cPickle.dumps(value, 2))
_producer.poll(0)
def some_request_handler (request):
#...
kafka_async_send('foo', ('some', 'stuff'))
#...
What should I do now? Do you probably have a suggestion how to debug it? (preferably right in production - it is kinda testing production so any overhead is currently affordable)