Skip to content

Commit

Permalink
Add logic for inferring newer broker versions (#2038)
Browse files Browse the repository at this point in the history
* Add logic for inferring newer broker versions

- New Fetch / ListOffsets request / response objects
- Add new test cases to inferr code based on mentioned objects
- Add unit test to check inferred version against whatever resides in KAFKA_VERSION
- Add new kafka broker versions to integration list
- Add more kafka broker versions to travis task list
- Add support for broker version 2.5 id

* Implement PR change requests: fewer versions for travis testing, remove unused older versions for inference code, remove one minor version from known server list
Do not use newly created ACL request / responses in allowed version lists, due to flexible versions enabling in kafka actually requiring a serialization protocol header update
Revert admin client file change
  • Loading branch information
Tincu Gabriel authored May 5, 2020
1 parent f9e0264 commit 6fc0081
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 10 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ env:
- KAFKA_VERSION=0.11.0.3
- KAFKA_VERSION=1.1.1
- KAFKA_VERSION=2.4.0
- KAFKA_VERSION=2.5.0

addons:
apt:
Expand Down
6 changes: 4 additions & 2 deletions build_integration.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"}
: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.1 2.3.0 2.4.0 2.5.0"}
: ${SCALA_VERSION:=2.11}
: ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/}
: ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git}
Expand Down Expand Up @@ -33,12 +33,14 @@ pushd servers
echo "-------------------------------------"
echo "Checking kafka binaries for ${kafka}"
echo
# kafka 0.8.0 is only available w/ scala 2.8.0
if [ "$kafka" == "0.8.0" ]; then
KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz"
else if [ "$kafka" \> "2.4.0" ]; then
KAFKA_ARTIFACT="kafka_2.12-${kafka}.tgz"
else
KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}.tgz"
fi
fi
if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then
if [ -f "${KAFKA_ARTIFACT}" ]; then
echo "Using cached artifact: ${KAFKA_ARTIFACT}"
Expand Down
12 changes: 11 additions & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.oauth.abstract import AbstractTokenProvider
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.produce import ProduceRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.parser import KafkaProtocol
from kafka.protocol.types import Int32, Int8
from kafka.scram import ScramClient
Expand Down Expand Up @@ -1166,6 +1169,13 @@ def _infer_broker_version_from_api_versions(self, api_versions):
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
((2, 3, 0), FetchRequest[11]),
((2, 2, 0), OffsetRequest[5]),
((2, 1, 0), FetchRequest[10]),
((2, 0, 0), FetchRequest[8]),
((1, 1, 0), FetchRequest[7]),
((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
Expand Down
20 changes: 20 additions & 0 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ class DescribeAclsResponse_v1(Response):
('permission_type', Int8)))))
)


class DescribeAclsResponse_v2(Response):
API_KEY = 29
API_VERSION = 2
SCHEMA = DescribeAclsResponse_v1.SCHEMA


class DescribeAclsRequest_v0(Request):
API_KEY = 29
API_VERSION = 0
Expand All @@ -490,6 +497,7 @@ class DescribeAclsRequest_v0(Request):
('permission_type', Int8)
)


class DescribeAclsRequest_v1(Request):
API_KEY = 29
API_VERSION = 1
Expand All @@ -504,6 +512,17 @@ class DescribeAclsRequest_v1(Request):
('permission_type', Int8)
)


class DescribeAclsRequest_v2(Request):
"""
Enable flexible version
"""
API_KEY = 29
API_VERSION = 2
RESPONSE_TYPE = DescribeAclsResponse_v2
SCHEMA = DescribeAclsRequest_v1.SCHEMA


DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1]
DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1]

Expand Down Expand Up @@ -862,3 +881,4 @@ class CreatePartitionsRequest_v1(Request):
CreatePartitionsResponse = [
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
]

182 changes: 180 additions & 2 deletions kafka/protocol/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,72 @@ class FetchResponse_v6(Response):
SCHEMA = FetchResponse_v5.SCHEMA


class FetchResponse_v7(Response):
"""
Add error_code and session_id to response
"""
API_KEY = 1
API_VERSION = 7
SCHEMA = Schema(
('throttle_time_ms', Int32),
('error_code', Int16),
('session_id', Int32),
('topics', Array(
('topics', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
('last_stable_offset', Int64),
('log_start_offset', Int64),
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
('message_set', Bytes)))))
)


class FetchResponse_v8(Response):
API_KEY = 1
API_VERSION = 8
SCHEMA = FetchResponse_v7.SCHEMA


class FetchResponse_v9(Response):
API_KEY = 1
API_VERSION = 9
SCHEMA = FetchResponse_v7.SCHEMA


class FetchResponse_v10(Response):
API_KEY = 1
API_VERSION = 10
SCHEMA = FetchResponse_v7.SCHEMA


class FetchResponse_v11(Response):
API_KEY = 1
API_VERSION = 11
SCHEMA = Schema(
('throttle_time_ms', Int32),
('error_code', Int16),
('session_id', Int32),
('topics', Array(
('topics', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
('last_stable_offset', Int64),
('log_start_offset', Int64),
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
('preferred_read_replica', Int32),
('message_set', Bytes)))))
)


class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
Expand Down Expand Up @@ -196,13 +262,125 @@ class FetchRequest_v6(Request):
SCHEMA = FetchRequest_v5.SCHEMA


class FetchRequest_v7(Request):
"""
Add incremental fetch requests
"""
API_KEY = 1
API_VERSION = 7
RESPONSE_TYPE = FetchResponse_v7
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('max_bytes', Int32),
('isolation_level', Int8),
('session_id', Int32),
('session_epoch', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('fetch_offset', Int64),
('log_start_offset', Int64),
('max_bytes', Int32))))),
('forgotten_topics_data', Array(
('topic', String),
('partitions', Array(Int32))
)),
)


class FetchRequest_v8(Request):
"""
bump used to indicate that on quota violation brokers send out responses before throttling.
"""
API_KEY = 1
API_VERSION = 8
RESPONSE_TYPE = FetchResponse_v8
SCHEMA = FetchRequest_v7.SCHEMA


class FetchRequest_v9(Request):
"""
adds the current leader epoch (see KIP-320)
"""
API_KEY = 1
API_VERSION = 9
RESPONSE_TYPE = FetchResponse_v9
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('max_bytes', Int32),
('isolation_level', Int8),
('session_id', Int32),
('session_epoch', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('current_leader_epoch', Int32),
('fetch_offset', Int64),
('log_start_offset', Int64),
('max_bytes', Int32))))),
('forgotten_topics_data', Array(
('topic', String),
('partitions', Array(Int32)),
)),
)


class FetchRequest_v10(Request):
"""
bumped up to indicate ZStandard capability. (see KIP-110)
"""
API_KEY = 1
API_VERSION = 10
RESPONSE_TYPE = FetchResponse_v10
SCHEMA = FetchRequest_v9.SCHEMA


class FetchRequest_v11(Request):
"""
added rack ID to support read from followers (KIP-392)
"""
API_KEY = 1
API_VERSION = 11
RESPONSE_TYPE = FetchResponse_v11
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('max_bytes', Int32),
('isolation_level', Int8),
('session_id', Int32),
('session_epoch', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('current_leader_epoch', Int32),
('fetch_offset', Int64),
('log_start_offset', Int64),
('max_bytes', Int32))))),
('forgotten_topics_data', Array(
('topic', String),
('partitions', Array(Int32))
)),
('rack_id', String('utf-8')),
)


FetchRequest = [
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5,
FetchRequest_v6
FetchRequest_v6, FetchRequest_v7, FetchRequest_v8,
FetchRequest_v9, FetchRequest_v10, FetchRequest_v11,
]
FetchResponse = [
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5,
FetchResponse_v6
FetchResponse_v6, FetchResponse_v7, FetchResponse_v8,
FetchResponse_v9, FetchResponse_v10, FetchResponse_v11,
]
Loading

0 comments on commit 6fc0081

Please sign in to comment.