Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes KafkaAdminClient returning IncompatibleBrokerVersion when passing an api_version #1953

Merged
merged 2 commits into from
Dec 29, 2019
Merged

Conversation

ian28223
Copy link
Contributor

@ian28223 ian28223 commented Nov 14, 2019

What does this PR do?

Fixes KafkaAdminClient always returning IncompatibleBrokerVersion when passing an api_version.
This will calls check_version() on init before get_api_versions() get called by _matching_api_version.


Passing an api_version to KafkaAdminClient seem to always raise IncompatibleBrokerVersion line 240

Found that this is because broker_api_versions = self._client.get_api_versions() always returns None apparently because check_version() had not been called previously.

Per the note from get_api_versions(),

A call to check_version must previously have succeeded and returned


This change is Reviewable

Passing an `api_version` to KafkaAdminClient always raises `IncompatibleBrokerVersion` line 240

Found that this is because `broker_api_versions = self._client.get_api_versions()` always returns None apparently because `check_version` had not been called previously.

Per the note from `get_api_versions()`,
> A call to check_version must previously have succeeded and returned
`self._client.check_version()` only needs to be called once for `self._client.get_api_versions()` to work in `def _matching_api_version`
@ian28223
Copy link
Contributor Author

Could someone review this PR?

@ian28223
Copy link
Contributor Author

@jeffwidman sorry to @ you here but I'm hoping if you would be able to review this?

@orange-kao
Copy link

I learned that the following code, which set api_version will always trigger kafka.errors.IncompatibleBrokerVersion in kafka/admin/client.py line 240. This PR solve the issue.

Reproduce steps

  1. Start a Kafka 2.3.1 cluster. I use Aiven for this test.
  2. Run pip3 install kafka-python. Version 1.4.7 has been installed
  3. Copy-and-paste the following code to test.py. Modify login credentials as needed
  4. Run ./test.py
#!/usr/bin/python3

from kafka.admin import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError

kafka_config = {}
kafka_config['bootstrap_servers'] = "kafka-xxxxx-xxxx.aivencloud.com:12697"
kafka_config['security_protocol'] = "SSL"
kafka_config['ssl_cafile'] = "kafka-ca"
kafka_config['ssl_certfile'] = "kafka-cert"
kafka_config['ssl_keyfile'] = "kafka-key"
kafka_config['api_version'] = (1,0,0)

kafka_admin = KafkaAdminClient(**kafka_config)
topic_list = []
topic_list.append(NewTopic(name="topic-name", num_partitions=1, replication_factor=1))
try:
    kafka_admin.create_topics(topic_list)
    print("Topic created successfully")
except TopicAlreadyExistsError:
    print("Topic already exist")

Expected result

Create the topic without exception

$ ./test.py 
Topic created successfully

Actual result

Reproducibility: always

$ ./test.py
Traceback (most recent call last):
  File "./p.py", line 15, in <module>
    kafka_admin = KafkaAdminClient(**kafka_config)
  File "/home/orange/.local/lib/python3.6/site-packages/kafka/admin/client.py", line 211, in __init__
    self._refresh_controller_id()
  File "/home/orange/.local/lib/python3.6/site-packages/kafka/admin/client.py", line 261, in _refresh_controller_id
    version = self._matching_api_version(MetadataRequest)
  File "/home/orange/.local/lib/python3.6/site-packages/kafka/admin/client.py", line 240, in _matching_api_version
    .format(operation[0].__name__))
kafka.errors.IncompatibleBrokerVersion: IncompatibleBrokerVersion: Kafka broker does not support the 'MetadataRequest_v0' Kafka protocol.

Additional information

  • Apply the changes proposed by this PR resolve the issue
  • Able to specify api_version is essential for high-latency network connection by avoid probing different API version.

@ian28223
Copy link
Contributor Author

@dpkp requesting for a review pls?

@ian28223 ian28223 changed the title calls check_version() before get_api_versions() Fixes KafkaAdminClient returning IncompatibleBrokerVersion when passing an api_version Dec 4, 2019
@dpkp
Copy link
Owner

dpkp commented Dec 29, 2019

I think this is fine, though I wonder why you would want to pass api_version to the admin client? It seems that configuration makes little sense here.

@ian28223
Copy link
Contributor Author

I think this is fine, though I wonder why you would want to pass api_version to the admin client? It seems that configuration makes little sense here.

tbh, not really sure why there is a need to pass the api_version but the keyword argument is made available and documented I guess patterned the same with KafkaClient but some of our users do.

I guess users use the resulting api_version inferred from a KafkaClient instance and want to make sure they pass the same version to KafkaAdminClient.

Anyhow, thanks for the review and merge! much appreciated 🙇

@ian28223 ian28223 deleted the patch-1 branch December 30, 2019 23:38
@jeffwidman
Copy link
Collaborator

jeffwidman commented Feb 5, 2020

I think this is fine, though I wonder why you would want to pass api_version to the admin client? It seems that configuration makes little sense here.

It's useful for:

  • performance optimization
  • pinning the admin client to a lower version so that if you're in the middle of a rolling upgrade and the auto-check might hit a newer broker this prevents accidentally issuing newer admin protocol calls to brokers that don't yet support them.

See here:
https://github.com/DataDog/integrations-core/blob/81f52fbedda70ec49b2b3126af47afa0e34b122b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example#L38-L47

cc @dpkp

@jacopofar
Copy link

I get the same problem that this PR aims to fix. In my case I'm setting the version to avoid auto discovery, which fails in the CI test runner but work on my computer probably due to some network differences. I think it's the problem shown here: #1308

@jeffwidman
Copy link
Collaborator

If you're hitting a problem, please open a new ticket. Also, can you make sure you're using the recently released 2.0 version?

@jacopofar
Copy link

Good to know, I didn't notice the new version. Turns out there was a configuration error and the problem isn;t in the library :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants