Skip to content

Commit 36cb999

Browse files
committed
KAFKA-17747: Add compute topic and group hash
Signed-off-by: PoAn Yang <payang@apache.org>
1 parent a78a931 commit 36cb999

File tree

5 files changed

+249
-0
lines changed

5 files changed

+249
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,6 +1420,7 @@ project(':group-coordinator') {
14201420
implementation libs.hdrHistogram
14211421
implementation libs.re2j
14221422
implementation libs.slf4jApi
1423+
implementation libs.guava
14231424

14241425
testImplementation project(':clients').sourceSets.test.output
14251426
testImplementation project(':server-common').sourceSets.test.output

checkstyle/import-control-group-coordinator.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
7878
<allow pkg="com.google.re2j" />
7979
<allow pkg="org.apache.kafka.metadata" />
80+
<allow pkg="com.google.common.hash" />
8081
<subpackage name="metrics">
8182
<allow pkg="com.yammer.metrics"/>
8283
<allow pkg="org.HdrHistogram" />

gradle/dependencies.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ versions += [
6161
classgraph: "4.8.173",
6262
gradle: "8.10.2",
6363
grgit: "4.1.1",
64+
guava: "33.4.0-jre",
6465
httpclient: "4.5.14",
6566
jackson: "2.16.2",
6667
jacoco: "0.8.10",
@@ -147,6 +148,7 @@ libs += [
147148
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
148149
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
149150
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
151+
guava: "com.google.guava:guava:$versions.guava",
150152
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
151153
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
152154
jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson",

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,21 @@
1919
import org.apache.kafka.common.KafkaException;
2020
import org.apache.kafka.common.message.ListGroupsResponseData;
2121
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
22+
import org.apache.kafka.image.ClusterImage;
23+
import org.apache.kafka.image.TopicImage;
24+
import org.apache.kafka.metadata.BrokerRegistration;
2225

26+
import com.google.common.hash.HashCode;
27+
import com.google.common.hash.HashFunction;
28+
import com.google.common.hash.Hasher;
29+
import com.google.common.hash.Hashing;
30+
31+
import java.nio.charset.StandardCharsets;
2332
import java.util.Arrays;
2433
import java.util.List;
2534
import java.util.Locale;
2635
import java.util.Map;
36+
import java.util.Objects;
2737
import java.util.Optional;
2838
import java.util.Set;
2939
import java.util.function.Function;
@@ -209,4 +219,50 @@ void validateOffsetFetch(
209219
default boolean shouldExpire() {
210220
return true;
211221
}
222+
223+
/**
224+
* Computes the hash of the topics in a group.
225+
*
226+
* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
227+
* @return The hash of the group.
228+
*/
229+
static long computeGroupHash(Map<String, Long> topicHashes) {
230+
return Hashing.combineOrdered(
231+
topicHashes.entrySet()
232+
.stream()
233+
.sorted(Map.Entry.comparingByKey())
234+
.map(e -> HashCode.fromLong(e.getValue()))
235+
.toList()
236+
).asLong();
237+
}
238+
239+
/**
240+
* Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
241+
*
242+
* @param topicImage The topic image.
243+
* @param clusterImage The cluster image.
244+
* @return The hash of the topic.
245+
*/
246+
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
247+
HashFunction hf = Hashing.murmur3_128();
248+
Hasher topicHasher = hf.newHasher()
249+
.putByte((byte) 0) // magic byte
250+
.putLong(topicImage.id().hashCode()) // topic Id
251+
.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
252+
.putInt(topicImage.partitions().size()); // number of partitions
253+
254+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
255+
topicHasher.putInt(entry.getKey()); // partition id
256+
String racks = Arrays.stream(entry.getValue().replicas)
257+
.mapToObj(clusterImage::broker)
258+
.filter(Objects::nonNull)
259+
.map(BrokerRegistration::rack)
260+
.filter(Optional::isPresent)
261+
.map(Optional::get)
262+
.sorted()
263+
.collect(Collectors.joining(";"));
264+
topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
265+
});
266+
return topicHasher.hash().asLong();
267+
}
212268
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.group;
18+
19+
import org.apache.kafka.common.Uuid;
20+
import org.apache.kafka.image.MetadataImage;
21+
22+
import com.google.common.hash.HashCode;
23+
import com.google.common.hash.HashFunction;
24+
import com.google.common.hash.Hasher;
25+
import com.google.common.hash.Hashing;
26+
27+
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.params.ParameterizedTest;
29+
import org.junit.jupiter.params.provider.Arguments;
30+
import org.junit.jupiter.params.provider.MethodSource;
31+
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.stream.Stream;
36+
37+
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
39+
40+
public class GroupTest {
41+
private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
42+
private static final String FOO_TOPIC_NAME = "foo";
43+
private static final String BAR_TOPIC_NAME = "bar";
44+
private static final int FOO_NUM_PARTITIONS = 2;
45+
private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder()
46+
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
47+
.addRacks()
48+
.build();
49+
50+
@Test
51+
void testComputeTopicHash() {
52+
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
53+
54+
HashFunction hf = Hashing.murmur3_128();
55+
Hasher topicHasher = hf.newHasher()
56+
.putByte((byte) 0) // magic byte
57+
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
58+
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
59+
.putInt(FOO_NUM_PARTITIONS) // number of partitions
60+
.putInt(0) // partition 0
61+
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
62+
.putInt(1) // partition 1
63+
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
64+
assertEquals(topicHasher.hash().asLong(), result);
65+
}
66+
67+
@Test
68+
void testComputeTopicHashWithDifferentMagicByte() {
69+
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
70+
71+
HashFunction hf = Hashing.murmur3_128();
72+
Hasher topicHasher = hf.newHasher()
73+
.putByte((byte) 1) // different magic byte
74+
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
75+
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
76+
.putInt(FOO_NUM_PARTITIONS) // number of partitions
77+
.putInt(0) // partition 0
78+
.putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
79+
.putInt(1) // partition 1
80+
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
81+
assertNotEquals(topicHasher.hash().asLong(), result);
82+
}
83+
84+
@Test
85+
void testComputeTopicHashWithDifferentPartitionOrder() {
86+
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
87+
88+
HashFunction hf = Hashing.murmur3_128();
89+
Hasher topicHasher = hf.newHasher()
90+
.putByte((byte) 0) // magic byte
91+
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
92+
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
93+
.putInt(FOO_NUM_PARTITIONS) // number of partitions
94+
// different partition order
95+
.putInt(1) // partition 1
96+
.putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
97+
.putInt(0) // partition 0
98+
.putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
99+
assertNotEquals(topicHasher.hash().asLong(), result);
100+
}
101+
102+
@Test
103+
void testComputeTopicHashWithDifferentRackOrder() {
104+
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
105+
106+
HashFunction hf = Hashing.murmur3_128();
107+
Hasher topicHasher = hf.newHasher()
108+
.putByte((byte) 0) // magic byte
109+
.putLong(FOO_TOPIC_ID.hashCode()) // topic Id
110+
.putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
111+
.putInt(FOO_NUM_PARTITIONS) // number of partitions
112+
.putInt(0) // partition 0
113+
.putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
114+
.putInt(1) // partition 1
115+
.putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
116+
assertNotEquals(topicHasher.hash().asLong(), result);
117+
}
118+
119+
@ParameterizedTest
120+
@MethodSource("differentFieldGenerator")
121+
void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) {
122+
long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
123+
124+
assertNotEquals(
125+
Group.computeTopicHash(
126+
differentImage.topics().getTopic(topicId),
127+
differentImage.cluster()
128+
),
129+
result
130+
);
131+
}
132+
133+
private static Stream<Arguments> differentFieldGenerator() {
134+
Uuid differentTopicId = Uuid.randomUuid();
135+
return Stream.of(
136+
Arguments.of(new MetadataImageBuilder() // different topic id
137+
.addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
138+
.addRacks()
139+
.build(),
140+
differentTopicId
141+
),
142+
Arguments.of(new MetadataImageBuilder() // different topic name
143+
.addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
144+
.addRacks()
145+
.build(),
146+
FOO_TOPIC_ID
147+
),
148+
Arguments.of(new MetadataImageBuilder() // different partitions
149+
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
150+
.addRacks()
151+
.build(),
152+
FOO_TOPIC_ID
153+
),
154+
Arguments.of(new MetadataImageBuilder() // different racks
155+
.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
156+
.build(),
157+
FOO_TOPIC_ID
158+
)
159+
);
160+
}
161+
162+
@Test
163+
void testComputeGroupHash() {
164+
long result = Group.computeGroupHash(Map.of(
165+
BAR_TOPIC_NAME, 123L,
166+
FOO_TOPIC_NAME, 456L
167+
));
168+
169+
long expected = Hashing.combineOrdered(List.of(
170+
HashCode.fromLong(123L),
171+
HashCode.fromLong(456L)
172+
)).asLong();
173+
assertEquals(expected, result);
174+
}
175+
176+
@Test
177+
void testComputeGroupHashWithDifferentOrder() {
178+
long result = Group.computeGroupHash(Map.of(
179+
BAR_TOPIC_NAME, 123L,
180+
FOO_TOPIC_NAME, 456L
181+
));
182+
183+
long unexpected = Hashing.combineOrdered(List.of(
184+
HashCode.fromLong(456L),
185+
HashCode.fromLong(123L)
186+
)).asLong();
187+
assertNotEquals(unexpected, result);
188+
}
189+
}

0 commit comments

Comments
 (0)