Skip to content

Commit 4b33fcf

Browse files
ozangunalpgsmet
authored andcommitted
Fix Dev UI crash when Kafka messages contain duplicate headers
Fixes #50913 (cherry picked from commit 2e85b3d)
1 parent b1f198e commit 4b33fcf

File tree

3 files changed

+137
-1
lines changed

3 files changed

+137
-1
lines changed

extensions/kafka-client/runtime-dev/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,10 @@
1818
<groupId>io.quarkus</groupId>
1919
<artifactId>quarkus-kafka-client</artifactId>
2020
</dependency>
21+
<dependency>
22+
<groupId>org.junit.jupiter</groupId>
23+
<artifactId>junit-jupiter</artifactId>
24+
<scope>test</scope>
25+
</dependency>
2126
</dependencies>
2227
</project>

extensions/kafka-client/runtime-dev/src/main/java/io/quarkus/kafka/client/runtime/dev/ui/model/converter/KafkaModelConverter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public KafkaMessage convert(ConsumerRecord<Bytes, Bytes> message) {
3030

3131
private static Map<String, String> headers(ConsumerRecord<Bytes, Bytes> message) {
3232
return StreamSupport.stream(message.headers().spliterator(), false)
33-
.collect(Collectors.toMap(Header::key, header -> new String(header.value(), StandardCharsets.UTF_8)));
33+
.collect(Collectors.toMap(Header::key, header -> new String(header.value(), StandardCharsets.UTF_8),
34+
(existing, replacement) -> replacement));
3435
}
3536
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package io.quarkus.kafka.client.runtime.dev.ui.model.converter;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNotNull;
5+
6+
import java.nio.charset.StandardCharsets;
7+
import java.util.Map;
8+
import java.util.Optional;
9+
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import org.apache.kafka.common.header.Headers;
12+
import org.apache.kafka.common.header.internals.RecordHeaders;
13+
import org.apache.kafka.common.record.TimestampType;
14+
import org.apache.kafka.common.utils.Bytes;
15+
import org.junit.jupiter.api.Test;
16+
17+
import io.quarkus.kafka.client.runtime.dev.ui.model.response.KafkaMessage;
18+
19+
class KafkaModelConverterTest {
20+
21+
@Test
22+
void testConvertWithDuplicateHeaders() {
23+
// Given a Kafka message with duplicate headers
24+
RecordHeaders headers = new RecordHeaders();
25+
addHeader(headers, "duplicate-key", "value1");
26+
addHeader(headers, "duplicate-key", "value2");
27+
addHeader(headers, "unique-key", "value3");
28+
29+
ConsumerRecord<Bytes, Bytes> record = new ConsumerRecord<>(
30+
"test-topic",
31+
0,
32+
100L,
33+
System.currentTimeMillis(),
34+
TimestampType.CREATE_TIME,
35+
0,
36+
0,
37+
Bytes.wrap("test-key".getBytes(StandardCharsets.UTF_8)),
38+
Bytes.wrap("test-value".getBytes(StandardCharsets.UTF_8)),
39+
headers,
40+
Optional.empty());
41+
42+
KafkaModelConverter converter = new KafkaModelConverter();
43+
44+
// When converting the record
45+
KafkaMessage message = converter.convert(record);
46+
47+
// Then the conversion should succeed without throwing an exception
48+
assertNotNull(message);
49+
assertEquals("test-topic", message.getTopic());
50+
assertEquals(0, message.getPartition());
51+
assertEquals(100L, message.getOffset());
52+
assertEquals("test-key", message.getKey());
53+
assertEquals("test-value", message.getValue());
54+
55+
// Headers should contain the last value for duplicate keys
56+
Map<String, String> convertedHeaders = message.getHeaders();
57+
assertNotNull(convertedHeaders);
58+
assertEquals(2, convertedHeaders.size());
59+
assertEquals("value2", convertedHeaders.get("duplicate-key"));
60+
assertEquals("value3", convertedHeaders.get("unique-key"));
61+
}
62+
63+
@Test
64+
void testConvertWithUniqueHeaders() {
65+
// Given a Kafka message with unique headers
66+
RecordHeaders headers = new RecordHeaders();
67+
addHeader(headers, "header1", "value1");
68+
addHeader(headers, "header2", "value2");
69+
70+
ConsumerRecord<Bytes, Bytes> record = new ConsumerRecord<>(
71+
"test-topic",
72+
1,
73+
200L,
74+
System.currentTimeMillis(),
75+
TimestampType.CREATE_TIME,
76+
0,
77+
0,
78+
Bytes.wrap("test-key".getBytes(StandardCharsets.UTF_8)),
79+
Bytes.wrap("test-value".getBytes(StandardCharsets.UTF_8)),
80+
headers,
81+
Optional.empty());
82+
83+
KafkaModelConverter converter = new KafkaModelConverter();
84+
85+
// When converting the record
86+
KafkaMessage message = converter.convert(record);
87+
88+
// Then the conversion should succeed
89+
assertNotNull(message);
90+
Map<String, String> convertedHeaders = message.getHeaders();
91+
assertNotNull(convertedHeaders);
92+
assertEquals(2, convertedHeaders.size());
93+
assertEquals("value1", convertedHeaders.get("header1"));
94+
assertEquals("value2", convertedHeaders.get("header2"));
95+
}
96+
97+
@Test
98+
void testConvertWithNoHeaders() {
99+
// Given a Kafka message with no headers
100+
RecordHeaders headers = new RecordHeaders();
101+
102+
ConsumerRecord<Bytes, Bytes> record = new ConsumerRecord<>(
103+
"test-topic",
104+
2,
105+
300L,
106+
System.currentTimeMillis(),
107+
TimestampType.CREATE_TIME,
108+
0,
109+
0,
110+
Bytes.wrap("test-key".getBytes(StandardCharsets.UTF_8)),
111+
Bytes.wrap("test-value".getBytes(StandardCharsets.UTF_8)),
112+
headers,
113+
Optional.empty());
114+
115+
KafkaModelConverter converter = new KafkaModelConverter();
116+
117+
// When converting the record
118+
KafkaMessage message = converter.convert(record);
119+
120+
// Then the conversion should succeed
121+
assertNotNull(message);
122+
Map<String, String> convertedHeaders = message.getHeaders();
123+
assertNotNull(convertedHeaders);
124+
assertEquals(0, convertedHeaders.size());
125+
}
126+
127+
private static void addHeader(Headers headers, String key, String value1) {
128+
headers.add(key, value1.getBytes(StandardCharsets.UTF_8));
129+
}
130+
}

0 commit comments

Comments
 (0)