Skip to content

Commit f083957

Browse files
authored
Update Kafka version parsing / supported range (#27720)
1 parent 29da912 commit f083957

File tree

5 files changed

+133
-69
lines changed

5 files changed

+133
-69
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
520520
- The disk queue is now GA. {pull}27515[27515]
521521
- Allow non-padded base64 data to be decoded by decode_base64_field {pull}27311[27311], {issue}27021[27021]
522522
- The Kafka support library Sarama has been updated to 1.29.1. {pull}27717[27717]
523+
- Kafka is now supported up to version 2.8.0. {pull}27720[27720]
523524

524525
*Auditbeat*
525526

filebeat/docs/inputs/input-kafka.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ link:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-for-kafka-ecos
5151
[[kafka-input-compatibility]]
5252
==== Compatibility
5353

54-
This input works with all Kafka versions in between 0.11 and 2.1.0. Older versions
54+
This input works with all Kafka versions in between 0.11 and 2.8.0. Older versions
5555
might work as well, but are not supported.
5656

5757
[id="{beatname_lc}-input-{type}-options"]

libbeat/common/kafka/version.go

Lines changed: 50 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -26,78 +26,47 @@ import (
2626
// Version is a kafka version
2727
type Version string
2828

29-
// TODO: remove me.
30-
// Compat version overwrite for missing versions in sarama
31-
// Public API is compatible between these versions.
3229
var (
33-
v0_10_2_1 = parseKafkaVersion("0.10.2.1")
34-
v0_11_0_1 = parseKafkaVersion("0.11.0.1")
35-
v0_11_0_2 = parseKafkaVersion("0.11.0.2")
36-
v1_0_1 = parseKafkaVersion("1.0.1")
37-
v1_0_2 = parseKafkaVersion("1.0.2")
38-
v1_1_1 = parseKafkaVersion("1.1.1")
39-
40-
kafkaVersions = map[string]sarama.KafkaVersion{
41-
"0.8.2.0": sarama.V0_8_2_0,
42-
"0.8.2.1": sarama.V0_8_2_1,
43-
"0.8.2.2": sarama.V0_8_2_2,
44-
"0.8.2": sarama.V0_8_2_2,
45-
"0.8": sarama.V0_8_2_2,
46-
47-
"0.9.0.0": sarama.V0_9_0_0,
48-
"0.9.0.1": sarama.V0_9_0_1,
49-
"0.9.0": sarama.V0_9_0_1,
50-
"0.9": sarama.V0_9_0_1,
51-
52-
"0.10.0.0": sarama.V0_10_0_0,
53-
"0.10.0.1": sarama.V0_10_0_1,
54-
"0.10.0": sarama.V0_10_0_1,
55-
"0.10.1.0": sarama.V0_10_1_0,
56-
"0.10.1": sarama.V0_10_1_0,
57-
"0.10.2.0": sarama.V0_10_2_0,
58-
"0.10.2.1": v0_10_2_1,
59-
"0.10.2": v0_10_2_1,
60-
"0.10": v0_10_2_1,
61-
62-
"0.11.0.0": sarama.V0_11_0_0,
63-
"0.11.0.1": v0_11_0_1,
64-
"0.11.0.2": v0_11_0_2,
65-
"0.11.0": v0_11_0_2,
66-
"0.11": v0_11_0_2,
67-
68-
"1.0.0": sarama.V1_0_0_0,
69-
"1.0.1": v1_0_1,
70-
"1.0.2": v1_0_2,
71-
"1.0": v1_0_2,
72-
"1.1.0": sarama.V1_1_0_0,
73-
"1.1.1": v1_1_1,
74-
"1.1": v1_1_1,
75-
"1": v1_1_1,
76-
77-
"2.0.0": sarama.V2_0_0_0,
78-
"2.0.1": sarama.V2_0_1_0,
79-
"2.0": sarama.V2_0_1_0,
80-
"2.1": sarama.V2_1_0_0,
81-
"2.2": sarama.V2_2_0_0,
82-
"2": sarama.V2_1_0_0,
30+
// Sarama expects version strings to be fully expanded, e.g. "1.1.1".
31+
// We also allow versions to be specified as a prefix, e.g. "1",
32+
// understood as referencing the most recent version starting with "1".
33+
// truncatedKafkaVersions stores a lookup of the abbreviations we accept.
34+
truncatedKafkaVersions = map[string]sarama.KafkaVersion{
35+
"0.8.2": sarama.V0_8_2_2,
36+
"0.8": sarama.V0_8_2_2,
37+
38+
"0.9.0": sarama.V0_9_0_1,
39+
"0.9": sarama.V0_9_0_1,
40+
41+
"0.10.0": sarama.V0_10_0_1,
42+
"0.10.1": sarama.V0_10_1_0,
43+
"0.10.2": sarama.V0_10_2_1,
44+
"0.10": sarama.V0_10_2_1,
45+
46+
"0.11.0": sarama.V0_11_0_2,
47+
"0.11": sarama.V0_11_0_2,
48+
49+
"1.0": sarama.V1_0_0_0,
50+
"1.1": sarama.V1_1_1_0,
51+
"1": sarama.V1_1_1_0,
52+
53+
"2.0": sarama.V2_0_1_0,
54+
"2.1": sarama.V2_1_0_0,
55+
"2.2": sarama.V2_2_0_0,
56+
"2.3": sarama.V2_3_0_0,
57+
"2.4": sarama.V2_4_0_0,
58+
"2.5": sarama.V2_5_0_0,
59+
"2.6": sarama.V2_6_0_0,
60+
"2": sarama.V2_6_0_0,
8361
}
8462
)
8563

86-
func parseKafkaVersion(s string) sarama.KafkaVersion {
87-
v, err := sarama.ParseKafkaVersion(s)
88-
if err != nil {
89-
panic(err)
90-
}
91-
return v
92-
}
93-
9464
// Validate that a kafka version is among the possible options
9565
func (v *Version) Validate() error {
96-
if _, ok := kafkaVersions[string(*v)]; !ok {
97-
return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v)
66+
if _, ok := v.Get(); ok {
67+
return nil
9868
}
99-
100-
return nil
69+
return fmt.Errorf("unknown/unsupported kafka version '%v'", *v)
10170
}
10271

10372
// Unpack a kafka version
@@ -113,6 +82,20 @@ func (v *Version) Unpack(s string) error {
11382

11483
// Get a sarama kafka version
11584
func (v Version) Get() (sarama.KafkaVersion, bool) {
116-
kv, ok := kafkaVersions[string(v)]
117-
return kv, ok
85+
// First check if it's one of the abbreviations we accept.
86+
// If not, let sarama parse it.
87+
s := string(v)
88+
if version, ok := truncatedKafkaVersions[s]; ok {
89+
return version, true
90+
}
91+
version, err := sarama.ParseKafkaVersion(s)
92+
if err != nil {
93+
return sarama.KafkaVersion{}, false
94+
}
95+
for _, supp := range sarama.SupportedVersions {
96+
if version == supp {
97+
return version, true
98+
}
99+
}
100+
return sarama.KafkaVersion{}, false
118101
}

libbeat/common/kafka/version_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package kafka
19+
20+
import (
21+
"testing"
22+
23+
"github.com/Shopify/sarama"
24+
)
25+
26+
func TestVersionGet(t *testing.T) {
27+
valid := map[Version]sarama.KafkaVersion{
28+
"0.11": sarama.V0_11_0_2,
29+
"1": sarama.V1_1_1_0,
30+
"2.0.0": sarama.V2_0_0_0,
31+
"2.0.1": sarama.V2_0_1_0,
32+
"2.0": sarama.V2_0_1_0,
33+
"2.5": sarama.V2_5_0_0,
34+
}
35+
invalid := []Version{
36+
"1.1.2",
37+
"1.2.3",
38+
"1.3",
39+
"hello",
40+
"2.0.3",
41+
}
42+
for s, expect := range valid {
43+
got, ok := s.Get()
44+
if !ok {
45+
t.Errorf("'%v' should parse as Kafka version %v, got nothing",
46+
s, expect)
47+
} else if got != expect {
48+
t.Errorf("'%v' should parse as Kafka version %v, got %v",
49+
s, expect, got)
50+
}
51+
}
52+
for _, s := range invalid {
53+
got, ok := s.Get()
54+
if ok {
55+
t.Errorf("'%v' is not a valid Kafka version but parsed as %v",
56+
s, got)
57+
}
58+
}
59+
}
60+
61+
func TestSaramaUpdate(t *testing.T) {
62+
// If any of these versions are considered valid by our parsing code,
63+
// it means someone updated sarama without updating the parsing code
64+
// for the new version. Gently remind them.
65+
flagVersions := []Version{"2.8.1", "2.9.0"}
66+
for _, v := range flagVersions {
67+
if _, ok := v.Get(); ok {
68+
t.Fatalf(
69+
"Kafka version %v is now considered valid. Did you update Sarama?\n"+
70+
"If so, remember to:\n"+
71+
"- Update truncatedKafkaVersions in libbeat/common/kafka/version.go\n"+
72+
"- Update the documentation to list the latest version:\n"+
73+
" * libbeat/outputs/kafka/docs/kafka.asciidoc\n"+
74+
" * filebeat/docs/inputs/inputs-kafka.asciidoc\n"+
75+
"- Update TestSaramaUpdate in libbeat/common/kafka/version_test.go\n",
76+
v)
77+
78+
}
79+
}
80+
}

libbeat/outputs/kafka/docs/kafka.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ NOTE: Events bigger than <<kafka-max_message_bytes,`max_message_bytes`>> will be
3434
[[kafka-compatibility]]
3535
==== Compatibility
3636

37-
This output works with all Kafka versions in between 0.11 and 2.2.2. Older versions
37+
This output works with all Kafka versions in between 0.11 and 2.8.0. Older versions
3838
might work as well, but are not supported.
3939

4040
==== Configuration options

0 commit comments

Comments
 (0)