Skip to content

Decode unsupported version fallback for ApiVersions #122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

jakubdyszkiewicz
Copy link

Hey 👋

I run into a problem with parsing ApiVersions response.

My Kafka Server supports ApiVersions v0-v3, but my client supports v0-v4.
When it connects to the server, it uses ApiVersions v4 as the first request, but the server does not support this, so it responds with ApiVersions v0, error code 35 and it sends ApiVersions min:0, max:3 in api_keys field, so that the client can retry with v3.

This behaviour is described in Retrieving Supported API versions.
and in KIP-511.

Because api_version is not in the part of the response, we need to first check error_code, to figure out this edge case and error_code is part of decode.

I could of course check if ApiKey is ApiVersions, peek into buffer, parse error code and downgrade version whenever I call decode, but that's a bit cumbersome.

It would be convenient to have it as a part of kafka-protocol, what do you think?
Thanks!

@rukai
Copy link
Collaborator

rukai commented Apr 28, 2025

Hi, thanks for the PR!

Unfortunately your approach wont work as that file is generated from the kafka protocol spec json files.

I think this approach by kafka is a bit sus, and I think users are generally better off just requesting ApiVersions v0.

But, I imagine you are after the _tagged_fields field in ApiVersions response v3?

Maybe we could provide a helper method that calls ApiVersions::decode and will extract the v0 message on UnsupportedVersion error?
Its going to have low discovery, but its the best I can think of.

@jakubdyszkiewicz
Copy link
Author

Ah, I missed the comment at the top of the file.

It's not even _tagged_fields, it fails on the compact array.

let error_code = types::Int16.decode(buf)?;
        let api_keys = if version >= 3 {
            types::CompactArray(types::Struct { version }).decode(buf)?
        } else {
            types::Array(types::Struct { version }).decode(buf)?
        };

I think this approach by kafka is a bit sus, and I think users are generally better off just requesting ApiVersions v0.

I agree, I'm also not a fan of this trickery, but I'm writing a middleware,e and unfortunately, we don't control the clients :( Even official clients for Kafka (Kafka package in bin directory) seem to follow this behaviour.

I managed to do a workaround, but I can contribute helper if you think it's a right fit for the library. What would be a good place for it? protocol/decode.rs?

@rukai
Copy link
Collaborator

rukai commented Apr 28, 2025

Huh, interesting, we use kafka-protocol for a proxy which I think is what you mean by middleware here.
I think we avoided this issue in our proxy because we have no need to parse the response to the clients apiversions.

Maybe we could add ithe special case to https://docs.rs/kafka-protocol/latest/kafka_protocol/messages/enum.ResponseKind.html#method.decode by adding a special case to the generator. Although If we are special casing the generator maybe we should just special case the generator to add the logic where you originally added it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants