Description
I am unable to recreate the behavior of the kafka-console-consumer.sh script with my python code. I am using the broker-stored offsets (new style API).
I have a topic io
which I am trying to read from the beginning with python. This topic already has data in it (generated by a confluent_kafka Producer).
When I run the python consumer I see the consumer but it does not have any partition information:
$ ./bin/kafka-consumer-groups.sh --bootstrap kafka-0.io:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
console-consumer-606
etl-3
console-consumer-66817
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0.io:9092 --describe --group io-etl-3
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
$
The consumer code looks like the references:
$ cat kafka-to-postgres.py
import os
import requests
import time
import psycopg2
import json
from confluent_kafka import Consumer, KafkaError
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Device
#### SQLALCHEMY CONNECTION ###
print "initializing db connection"
engine = create_engine( "postgresql+psycopg2://" + os.environ['POSTGRES_USER'] + ":" + os.environ['POSTGRES_PW'] + "@" + os.environ['POSTGRES_HOST'] + ":" + os.environ["POSTGRES_SERVICE_PORT_PSQL"] + "/" + os.environ['POSTGRES_DB'] )
s = sessionmaker(bind=engine)
session = s()
#### KAKFA CONNECTTION ####
print "initializing kafka connection"
c = Consumer({
'bootstrap.servers': os.environ['KAFKA_URL'] ,
'group.id': os.environ['KAFKA_CONSUMER_GROUP'],
'auto.offset.reset': 'smallest',
})
def on_assign (c, ps):
print("on_assign!!!")
for p in ps:
import pdb; pdb.set_trace()
print("on_assigning")
p.offset=-10
c.assign(ps)
c.subscribe(['io'], on_assign=on_assign)
print "entering run loop"
running = True
while running:
print "going to poll"
msg = c.poll(timeout=1)
if msg and not msg.error():
# snip
elif msg and msg.error().code() != KafkaError._PARTITION_EOF:
print("error: %s" % msg.error().str())
running = False
elif msg and msg.error():
print("got error: %s" % ( msg.error().str() ) )
else:
print("got message %s" % (msg) )
c.close()
print "exiting"
exit(1)
With this output:
# KAFKA_CONSUMER_GROUP=io-etl-3 python kafka-to-postgres.py
initializing db connection
initializing kafka connection
entering run loop
going to poll
on_assign!!!
> /kafka-to-postgres.py(33)on_assign()
-> print("on_assigning")
(Pdb) l
28
29 def on_assign (c, ps):
30 print("on_assign!!!")
31 for p in ps:
32 import pdb; pdb.set_trace()
33 -> print("on_assigning")
34 p.offset=-10
35 c.assign(ps)
36 c.subscribe(['io'], on_assign=on_assign)
37
38 print "entering run loop"
(Pdb) p ps
[TopicPartition{topic=io,partition=0,offset=-1001,error=None}, TopicPartition{topic=io,partition=1,offset=-1001,error=None}]
(Pdb) p p
TopicPartition{topic=io,partition=0,offset=-1001,error=None}
(Pdb)
TopicPartition{topic=io,partition=0,offset=-1001,error=None}
(Pdb) p p.offset
-1001L
(Pdb) c
on_assigning
> /kafka-to-postgres.py(32)on_assign()
-> import pdb; pdb.set_trace()
(Pdb) c
on_assigning
got message None
going to poll
got message None
going to poll
got message None
going to poll
so no data comes through. I have tried setting the partition offset to 0
, -10
. The default value provided to the on_assign
callback is an offset of -1001L
, if that makes any difference.
Compare to the output from invoking the kafka console consumer:
$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka-0.io:9092 --topic io --from-beginning --timeout-ms=60000
# DATA DELUGE
and the offset inspector showing:
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092 --describe --group console-consumer-72507
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'console-consumer-72507' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
io 1 2393 2393 0 - - -
io 0 2393 2393 0 - -
Why isn't the on_assign hook making any difference? I should see some kind of data coming through.