Skip to content

Expose offsets_for_times consumer method. closes #224 #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

johnistan
Copy link
Contributor

Expose offsets_for_times consumer method. closes #224

@johnistan
Copy link
Contributor Author

I have tested this on an actual kafka instance, but there doesn't appear to be integration tests. So i have only checked in the basic smoke test.

@johnistan johnistan force-pushed the expose-offset-for-times-method branch from fe3cd06 to c2e2807 Compare October 26, 2017 00:02
Expose offsets_for_times consumer method. closes confluentinc#224

Expose offsets_for_times consumer method. closes confluentinc#224

Expose offsets_for_times consumer method. closes confluentinc#224
@johnistan johnistan force-pushed the expose-offset-for-times-method branch from c2e2807 to c7ea58f Compare October 26, 2017 00:28
@johnistan
Copy link
Contributor Author

failing on untouched pep8 issues

./confluent_kafka/avro/cached_schema_registry_client.py:171:13: E722 do not use bare except'
./confluent_kafka/avro/cached_schema_registry_client.py:207:13: E722 do not use bare except'
./confluent_kafka/avro/cached_schema_registry_client.py:272:9: E722 do not use bare except'
./confluent_kafka/avro/serializer/message_serializer.py:45:1: E722 do not use bare except'
./confluent_kafka/avro/serializer/message_serializer.py:161:9: E722 do not use bare except'
./confluent_kafka/avro/serializer/message_serializer.py:183:13: E722 do not use bare except'
./examples/consumer.py:62:9: E722 do not use bare except'
./tests/test_threads.py:8:1: E722 do not use bare except'
./tests/avro/mock_registry.py:123:9: E722 do not use bare except'

@johnistan
Copy link
Contributor Author

Other then the flake8 fails. anything this needs?

@johnistan
Copy link
Contributor Author

bump. happy to respond to any comments.

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One doc fix needed.
Please also add a call to offsets_for_times() to the consumer integration test, just making sure it returns some valid offsets.

" corresponding partition.\n"
"\n"
" :param list(TopicPartition) partitions: topic+partitions with timestamps in the TopicPartition.offset field."
" :param float timeout: Request timeout (when cached=False).\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove cache mention

@edenhill
Copy link
Contributor

Thank you for this, almost there!

@johnistan
Copy link
Contributor Author

After some more testing I think i have found an issue with calling offsets_for_time with no timeout. The call just hangs and never returns. I need to dig in more but have you seen behavior like this before from the underlying C function?

@johnistan
Copy link
Contributor Author

Found some more issues. The first call from a newly created consumer will fail like so:

offsets = c.offsets_for_times(topic_partitions_to_search)
cimpl.KafkaException: KafkaError{code=LEADER_NOT_AVAILABLE,val=5,str="Failed to get offsets: Broker: Leader not available"}

I suspect the same issue a #196 and underlying confluentinc/librdkafka#1373

Adding a timeout causes it to work.

@johnistan
Copy link
Contributor Author

It could be that my code has an issue but I think there are issues with the underlying C implementation. Without a formal integration setup that spins up a fresh kafka, sharing a reproducible example is a pain but here is a simple script:

This script shows two issues. First calling offset_for_times without a timeout or a -1 timeout, returns a copy of the TopicPartition objects you passed in. Second, calling offset_for_times without a timeout causes any subsequent close() on the consumer to hang.

import confluent_kafka
import uuid

topic_name = 'test-offsets'

conf = {
    'bootstrap.servers': 'localhost:9092',
    'api.version.request': True
}

# Create producer
p = confluent_kafka.Producer(**conf)

for i in range(1000):
    p.produce(topic_name, 'foo', timestamp=123456789000)
p.flush()

c_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': uuid.uuid1(),
    'session.timeout.ms': 6000,
    'api.version.request': True
}
c = confluent_kafka.Consumer(**c_conf)

topic_partitions_to_search = list(map(lambda p: confluent_kafka.TopicPartition(topic_name, p, 123456788000), range(0, 3)))
print("topic_partitions_to_search: %s" % topic_partitions_to_search)
# hack to make sure consumer has topic/broker metadata
c.subscribe([topic_name])
msg = c.poll()

offsets_with_timeout = c.offsets_for_times(topic_partitions_to_search, timeout=1.0)
offsets_with_neg_1_timeout = c.offsets_for_times(topic_partitions_to_search, timeout=-1)
offsets_without_timeout = c.offsets_for_times(topic_partitions_to_search)

print("offsets_with_timeout: %s" % offsets_with_timeout)
print("offsets_with_neg_1_timeout: %s" % offsets_with_neg_1_timeout)
print("offsets_without_timeout: %s" % offsets_without_timeout)

print("Right before close")
c.close()
print("Right after close")

Outputs:

py27 runtests: commands[1] | python examples/test.py
topic_partitions_to_search: [TopicPartition{topic=test-offsets,partition=0,offset=123456788000,error=None}, TopicPartition{topic=test-offsets,partition=1,offset=123456788000,error=None}, TopicPartition{topic=test-offsets,partition=2,offset=123456788000,error=None}]
offsets_with_timeout: [TopicPartition{topic=test-offsets,partition=0,offset=8557,error=None}, TopicPartition{topic=test-offsets,partition=1,offset=8179,error=None}, TopicPartition{topic=test-offsets,partition=2,offset=9269,error=None}]
offsets_with_neg_1_timeout: [TopicPartition{topic=test-offsets,partition=0,offset=123456788000,error=None}, TopicPartition{topic=test-offsets,partition=1,offset=123456788000,error=None}, TopicPartition{topic=test-offsets,partition=2,offset=123456788000,error=None}]
offsets_without_timeout: [TopicPartition{topic=test-offsets,partition=0,offset=123456788000,error=None}, TopicPartition{topic=test-offsets,partition=1,offset=123456788000,error=None}, TopicPartition{topic=test-offsets,partition=2,offset=123456788000,error=None}]
Right before close
^CERROR: KEYBOARDINTERRUPT
C^CERROR: keyboardinterrupt

@johnistan
Copy link
Contributor Author

Running locally with these changes to librdkafka: confluentinc/librdkafka#1525 i get good results:

py27 runtests: commands[1] | python examples/test.py
topic_partitions_to_search: [TopicPartition{topic=test-offsets,partition=0,offset=123456788000,error=None}, TopicPartition{topic=test-offsets,partition=1,offset=123456788000,error=None}, TopicPartition{topic=test-offsets,partition=2,offset=123456788000,error=None}]
offsets_with_timeout: [TopicPartition{topic=test-offsets,partition=0,offset=9888,error=None}, TopicPartition{topic=test-offsets,partition=1,offset=9406,error=None}, TopicPartition{topic=test-offsets,partition=2,offset=10711,error=None}]
offsets_with_neg_1_timeout: [TopicPartition{topic=test-offsets,partition=0,offset=9888,error=None}, TopicPartition{topic=test-offsets,partition=1,offset=9406,error=None}, TopicPartition{topic=test-offsets,partition=2,offset=10711,error=None}]
offsets_without_timeout: [TopicPartition{topic=test-offsets,partition=0,offset=9888,error=None}, TopicPartition{topic=test-offsets,partition=1,offset=9406,error=None}, TopicPartition{topic=test-offsets,partition=2,offset=10711,error=None}]
Right before close
Right after close
__________________________________________________________________________________________________________________ summary ___________________________________________________________________________________________________________________
  py27: commands succeeded
  congratulations :)

@edenhill edenhill merged commit 514f6a8 into confluentinc:master Dec 15, 2017
@matthew-d-jones
Copy link

@edenhill I think this should have closed #225 and not #224.

@DestroyerAlpha
Copy link

Is there proper documentation for this?

@matthew-d-jones
Copy link

@DestroyerAlpha https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.offsets_for_times

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unable to start from beginning of topic using on_assign hook
4 participants