Skip to content

Unable to start from beginning of topic using on_assign hook #224

Closed
@xrl

Description

@xrl

I am unable to recreate the behavior of the kafka-console-consumer.sh script with my python code. I am using the broker-stored offsets (new style API).

I have a topic io which I am trying to read from the beginning with python. This topic already has data in it (generated by a confluent_kafka Producer).

When I run the python consumer I see the consumer but it does not have any partition information:

$ ./bin/kafka-consumer-groups.sh --bootstrap kafka-0.io:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

console-consumer-606
etl-3
console-consumer-66817
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0.io:9092 --describe --group io-etl-3
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
$

The consumer code looks like the references:

$ cat kafka-to-postgres.py
import os
import requests
import time
import psycopg2
import json

from confluent_kafka import Consumer, KafkaError

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from models import Device

#### SQLALCHEMY CONNECTION ###
print "initializing db connection"
engine = create_engine( "postgresql+psycopg2://" +  os.environ['POSTGRES_USER'] + ":" + os.environ['POSTGRES_PW'] + "@" + os.environ['POSTGRES_HOST'] + ":" + os.environ["POSTGRES_SERVICE_PORT_PSQL"] + "/" + os.environ['POSTGRES_DB'] )

s = sessionmaker(bind=engine)
session = s()

#### KAKFA CONNECTTION ####
print "initializing kafka connection"
c = Consumer({
    'bootstrap.servers': os.environ['KAFKA_URL'] ,
    'group.id':          os.environ['KAFKA_CONSUMER_GROUP'],
    'auto.offset.reset': 'smallest',
})

def on_assign (c, ps):
  print("on_assign!!!")
  for p in ps:
    import pdb; pdb.set_trace()
    print("on_assigning")
    p.offset=-10
    c.assign(ps)
c.subscribe(['io'], on_assign=on_assign)

print "entering run loop"
running = True
while running:
    print "going to poll"
    msg = c.poll(timeout=1)
    if msg and not msg.error():
       # snip
    elif msg and msg.error().code() != KafkaError._PARTITION_EOF:
        print("error: %s"  % msg.error().str())
        running = False
    elif msg and msg.error():
        print("got error: %s" % ( msg.error().str() ) )
    else:
        print("got message %s" % (msg) )
c.close()
print "exiting"
exit(1)

With this output:

# KAFKA_CONSUMER_GROUP=io-etl-3 python kafka-to-postgres.py
initializing db connection
initializing kafka connection
entering run loop
going to poll
on_assign!!!
> /kafka-to-postgres.py(33)on_assign()
-> print("on_assigning")
(Pdb) l
 28
 29  	def on_assign (c, ps):
 30  	  print("on_assign!!!")
 31  	  for p in ps:
 32  	    import pdb; pdb.set_trace()
 33  ->	    print("on_assigning")
 34  	    p.offset=-10
 35  	    c.assign(ps)
 36  	c.subscribe(['io'], on_assign=on_assign)
 37
 38  	print "entering run loop"
(Pdb) p ps
[TopicPartition{topic=io,partition=0,offset=-1001,error=None}, TopicPartition{topic=io,partition=1,offset=-1001,error=None}]
(Pdb) p p
TopicPartition{topic=io,partition=0,offset=-1001,error=None}
(Pdb)
TopicPartition{topic=io,partition=0,offset=-1001,error=None}
(Pdb) p p.offset
-1001L
(Pdb) c
on_assigning
> /kafka-to-postgres.py(32)on_assign()
-> import pdb; pdb.set_trace()
(Pdb) c
on_assigning
got message None
going to poll
got message None
going to poll
got message None
going to poll

so no data comes through. I have tried setting the partition offset to 0, -10. The default value provided to the on_assign callback is an offset of -1001L, if that makes any difference.

Compare to the output from invoking the kafka console consumer:

$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka-0.io:9092 --topic io --from-beginning --timeout-ms=60000
# DATA DELUGE

and the offset inspector showing:

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092 --describe --group console-consumer-72507
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

Consumer group 'console-consumer-72507' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
io                          1          2393            2393            0          -                                                 -                              -
io                          0          2393            2393            0          -                                                 -

Why isn't the on_assign hook making any difference? I should see some kind of data coming through.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions