Skip to content

Feature: rewrite DescribeCluster response #212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ The Kafka Proxy is based on idea of [Cloud SQL Proxy](https://github.com/GoogleC
It allows a service to connect to Kafka brokers without having to deal with SASL/PLAIN authentication and SSL certificates.

It works by opening tcp sockets on the local machine and proxying connections to the associated Kafka brokers
when the sockets are used. The host and port in [Metadata](http://kafka.apache.org/protocol.html#The_Messages_Metadata)
and [FindCoordinator](http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator)
when the sockets are used. The host and port in [Metadata](http://kafka.apache.org/protocol.html#The_Messages_Metadata), [FindCoordinator](http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator) & [DescribeCluster](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DescribeClusterResponse.json)
responses received from the brokers are replaced by local counterparts.
For discovered brokers (not configured as the boostrap servers), local listeners are started on random ports.
The dynamic local listeners feature can be disabled and an additional list of external server mappings can be provided.
Expand Down
3 changes: 2 additions & 1 deletion proxy/protocol/real_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package protocol

import (
"encoding/binary"
"github.com/google/uuid"
"math"

"github.com/google/uuid"
)

var errInvalidArrayLength = PacketDecodingError{"invalid array length"}
Expand Down
107 changes: 107 additions & 0 deletions proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
const (
apiKeyMetadata = 3
apiKeyFindCoordinator = 10
apiKeyDescribeCluster = 60

brokersKeyName = "brokers"
brokerKeyName = "broker_id"
hostKeyName = "host"
portKeyName = "port"
nodeKeyName = "node_id"
Expand All @@ -23,6 +25,7 @@ const (
var (
metadataResponseSchemaVersions = createMetadataResponseSchemaVersions()
findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions()
describeClusterResponseSchemaVersions = createDescribeClusterResponseSchemaVersions()
)

func createMetadataResponseSchemaVersions() []Schema {
Expand Down Expand Up @@ -325,6 +328,58 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6}
}

func createDescribeClusterResponseSchemaVersions() []Schema {
describeClusterBrokerV0 := NewSchema("describe_cluster_broker_v0",
&Mfield{Name: brokerKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)
describeClusterBrokerV2 := NewSchema("describe_cluster_broker_v2",
&Mfield{Name: brokerKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
&Mfield{Name: "is_fenced", Ty: TypeBool},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

describeClusterV0 := NewSchema("describe_cluster_response_v0",
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&Mfield{Name: "error_code", Ty: TypeInt16},
&Mfield{Name: "error_message", Ty: TypeCompactNullableStr},
&Mfield{Name: "cluster_id", Ty: TypeCompactStr},
&Mfield{Name: "controller_id", Ty: TypeInt32},
&CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0},
&Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)
describeClusterV1 := NewSchema("describe_cluster_response_v1",
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&Mfield{Name: "error_code", Ty: TypeInt16},
&Mfield{Name: "error_message", Ty: TypeCompactNullableStr},
&Mfield{Name: "endpoint_type", Ty: TypeInt8},
&Mfield{Name: "cluster_id", Ty: TypeCompactStr},
&Mfield{Name: "controller_id", Ty: TypeInt32},
&CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0},
&Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)
describeClusterV2 := NewSchema("describe_cluster_response_v2",
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&Mfield{Name: "error_code", Ty: TypeInt16},
&Mfield{Name: "error_message", Ty: TypeCompactNullableStr},
&Mfield{Name: "endpoint_type", Ty: TypeInt8},
&Mfield{Name: "cluster_id", Ty: TypeCompactStr},
&Mfield{Name: "controller_id", Ty: TypeInt32},
&CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV2},
&Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)
return []Schema{describeClusterV0, describeClusterV1, describeClusterV2}
}

func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
Expand Down Expand Up @@ -441,6 +496,56 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e
return nil
}

func modifyDescribeClusterResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
}
if fn == nil {
return errors.New("net address mapper must not be nil")
}
brokersArray, ok := decodedStruct.Get(brokersKeyName).([]interface{})
if !ok {
return errors.New("brokers list not found")
}
for _, brokerElement := range brokersArray {
broker := brokerElement.(*Struct)
host, ok := broker.Get(hostKeyName).(string)
if !ok {
return errors.New("broker.host not found")
}
port, ok := broker.Get(portKeyName).(int32)
if !ok {
return errors.New("broker.port not found")
}
brokerId, ok := broker.Get(brokerKeyName).(int32)
if !ok {
return errors.New("broker.broker_id not found")
}

if host == "" && port <= 0 {
continue
}

newHost, newPort, err := fn(host, port, brokerId)
if err != nil {
return err
}
if host != newHost {
err := broker.Replace(hostKeyName, newHost)
if err != nil {
return err
}
}
if port != newPort {
err = broker.Replace(portKeyName, newPort)
if err != nil {
return err
}
}
}
return nil
}

type ResponseModifier interface {
Apply(resp []byte) ([]byte, error)
}
Expand Down Expand Up @@ -471,6 +576,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse)
case apiKeyFindCoordinator:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse)
case apiKeyDescribeCluster:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, describeClusterResponseSchemaVersions, modifyDescribeClusterResponse)
default:
return nil, nil
}
Expand Down
205 changes: 204 additions & 1 deletion proxy/protocol/responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (

"github.com/google/uuid"

"github.com/grepplabs/kafka-proxy/config"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/grepplabs/kafka-proxy/config"
)

var (
Expand Down Expand Up @@ -2600,6 +2601,206 @@ func testMetadataResponse(t *testing.T, apiVersion int16, payload string, expect
a.Equal(expectedModified, dc.AttrValues())
}

func TestDescribeClusterResponseV0(t *testing.T) {
payload := "000000000000000b6d795f636c75737465720000000003000000010a6c6f63616c686f737400004a940000000000020a6c6f63616c686f7374000071a400000000000000"

expectedInput := []string{
"throttle_time_ms int32 0",
"error_code int16 0",
"error_message *string <nil>",
"cluster_id string my_cluster",
"controller_id int32 0",
"[brokers]",
"brokers struct",
"broker_id int32 1",
"host string localhost",
"port int32 19092",
"rack *string <nil>",
"[response_tagged_fields]",
"brokers struct",
"broker_id int32 2",
"host string localhost",
"port int32 29092",
"rack *string <nil>",
"[response_tagged_fields]",
"cluster_authorized_operations int32 0",
"[response_tagged_fields]",
}
expectedModified := []string{
"throttle_time_ms int32 0",
"error_code int16 0",
"error_message *string <nil>",
"cluster_id string my_cluster",
"controller_id int32 0",
"[brokers]",
"brokers struct",
"broker_id int32 1",
"host string myhost1", // replaced
"port int32 34001", // replaced
"rack *string <nil>",
"[response_tagged_fields]",
"brokers struct",
"broker_id int32 2",
"host string myhost2", // replaced
"port int32 34002", // replaced
"rack *string <nil>",
"[response_tagged_fields]",
"cluster_authorized_operations int32 0",
"[response_tagged_fields]",
}

testDescribeClusterResponse(t, 0, payload, expectedInput, expectedModified)
}

func TestDescribeClusterResponseV1(t *testing.T) {
payload := "00000000000000010b6d795f636c75737465720000000003000000010a6c6f63616c686f737400004a940000000000020a6c6f63616c686f7374000071a400000000000000"

expectedInput := []string{
"throttle_time_ms int32 0",
"error_code int16 0",
"error_message *string <nil>",
"endpoint_type int8 1",
"cluster_id string my_cluster",
"controller_id int32 0",
"[brokers]",
"brokers struct",
"broker_id int32 1",
"host string localhost",
"port int32 19092",
"rack *string <nil>",
"[response_tagged_fields]",
"brokers struct",
"broker_id int32 2",
"host string localhost",
"port int32 29092",
"rack *string <nil>",
"[response_tagged_fields]",
"cluster_authorized_operations int32 0",
"[response_tagged_fields]",
}
expectedModified := []string{
"throttle_time_ms int32 0",
"error_code int16 0",
"error_message *string <nil>",
"endpoint_type int8 1",
"cluster_id string my_cluster",
"controller_id int32 0",
"[brokers]",
"brokers struct",
"broker_id int32 1",
"host string myhost1", // replaced
"port int32 34001", // replaced
"rack *string <nil>",
"[response_tagged_fields]",
"brokers struct",
"broker_id int32 2",
"host string myhost2", // replaced
"port int32 34002", // replaced
"rack *string <nil>",
"[response_tagged_fields]",
"cluster_authorized_operations int32 0",
"[response_tagged_fields]",
}

testDescribeClusterResponse(t, 1, payload, expectedInput, expectedModified)
}

func TestDescribeClusterResponseV2(t *testing.T) {
payload := "00000000000000010b6d795f636c75737465720000000003000000010a6c6f63616c686f737400004a94000000000000020a6c6f63616c686f7374000071a40000000000000000"

expectedInput := []string{
"throttle_time_ms int32 0",
"error_code int16 0",
"error_message *string <nil>",
"endpoint_type int8 1",
"cluster_id string my_cluster",
"controller_id int32 0",
"[brokers]",
"brokers struct",
"broker_id int32 1",
"host string localhost",
"port int32 19092",
"rack *string <nil>",
"is_fenced bool false",
"[response_tagged_fields]",
"brokers struct",
"broker_id int32 2",
"host string localhost",
"port int32 29092",
"rack *string <nil>",
"is_fenced bool false",
"[response_tagged_fields]",
"cluster_authorized_operations int32 0",
"[response_tagged_fields]",
}
expectedModified := []string{
"throttle_time_ms int32 0",
"error_code int16 0",
"error_message *string <nil>",
"endpoint_type int8 1",
"cluster_id string my_cluster",
"controller_id int32 0",
"[brokers]",
"brokers struct",
"broker_id int32 1",
"host string myhost1", // replaced
"port int32 34001", // replaced
"rack *string <nil>",
"is_fenced bool false",
"[response_tagged_fields]",
"brokers struct",
"broker_id int32 2",
"host string myhost2", // replaced
"port int32 34002", // replaced
"rack *string <nil>",
"is_fenced bool false",
"[response_tagged_fields]",
"cluster_authorized_operations int32 0",
"[response_tagged_fields]",
}

testDescribeClusterResponse(t, 2, payload, expectedInput, expectedModified)
}

func testDescribeClusterResponse(t *testing.T, apiVersion int16, payload string, expectedInput, expectedModified []string) {
bytes, err := hex.DecodeString(payload)
if err != nil {
t.Fatal(err)
}
a := assert.New(t)

schema := describeClusterResponseSchemaVersions[apiVersion]

s, err := DecodeSchema(bytes, schema)
a.Nil(err)

dc := newDecodeCheck()
err = dc.Traverse(s)
if err != nil {
t.Fatal(err)
}
a.Equal(expectedInput, dc.AttrValues())
resp, err := EncodeSchema(s, schema)
a.Nil(err)
a.Equal(bytes, resp)

modifier, err := GetResponseModifier(apiKeyDescribeCluster, apiVersion, testResponseModifier2)
if err != nil {
t.Fatal(err)
}
a.Nil(err)
resp, err = modifier.Apply(resp)
a.Nil(err)
s, err = DecodeSchema(resp, schema)
a.Nil(err)
dc = newDecodeCheck()
err = dc.Traverse(s)
if err != nil {
t.Fatal(err)
}
a.Equal(expectedModified, dc.AttrValues())
}

func TestFindCoordinatorResponseV0(t *testing.T) {
/*
FindCoordinator Response (Version: 0) => error_code coordinator
Expand Down Expand Up @@ -3162,6 +3363,8 @@ func (t *decodeCheck) value(s *Struct, arg interface{}, sindex int) error {
switch v := arg.(type) {
case bool:
t.append(name, "bool", v)
case int8:
t.append(name, "int8", v)
case int16:
t.append(name, "int16", v)
case int32:
Expand Down
Loading