Skip to content

Commit 51dbd17

Browse files
authored
KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683)
Reviewers: Guozhang Wang <wangguoz@gmail.com>
1 parent b9da249 commit 51dbd17

File tree

2 files changed

+42
-8
lines changed

2 files changed

+42
-8
lines changed

clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,40 @@
1616
*/
1717
package org.apache.kafka.common.serialization;
1818

19+
import org.apache.kafka.common.utils.Utils;
20+
1921
import java.nio.ByteBuffer;
2022

23+
/**
24+
* ByteBufferSerializer will not change ByteBuffer's mark, position and limit.
25+
* And do not need to flip before call <i>serialize(String, ByteBuffer)</i>. For example:
26+
*
27+
* <blockquote>
28+
* <pre>
29+
* ByteBufferSerializer serializer = ...; // Create Serializer
30+
* ByteBuffer buffer = ...; // Allocate ByteBuffer
31+
* buffer.put(data); // Put data into buffer, do not need to flip
32+
* serializer.serialize(topic, buffer); // Serialize buffer
33+
* </pre>
34+
* </blockquote>
35+
*/
2136
public class ByteBufferSerializer implements Serializer<ByteBuffer> {
37+
38+
@Override
2239
public byte[] serialize(String topic, ByteBuffer data) {
23-
if (data == null)
40+
if (data == null) {
2441
return null;
25-
26-
data.rewind();
42+
}
2743

2844
if (data.hasArray()) {
29-
byte[] arr = data.array();
45+
final byte[] arr = data.array();
3046
if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
3147
return arr;
3248
}
3349
}
3450

35-
byte[] ret = new byte[data.remaining()];
36-
data.get(ret, 0, ret.length);
37-
data.rewind();
38-
return ret;
51+
final ByteBuffer copyData = data.asReadOnlyBuffer();
52+
copyData.flip();
53+
return Utils.toArray(copyData);
3954
}
4055
}

clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.util.LinkedList;
3232
import java.util.Stack;
3333

34+
import static java.nio.charset.StandardCharsets.UTF_8;
35+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
3436
import static org.junit.jupiter.api.Assertions.assertTrue;
3537
import static org.junit.jupiter.api.Assertions.assertEquals;
3638
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -368,4 +370,21 @@ private Serde<String> getStringSerde(String encoder) {
368370

369371
return Serdes.serdeFrom(serializer, deserializer);
370372
}
373+
374+
@Test
375+
public void testByteBufferSerializer() {
376+
final byte[] bytes = "Hello".getBytes(UTF_8);
377+
final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
378+
final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
379+
final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
380+
final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
381+
final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes);
382+
try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
383+
assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
384+
assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
385+
assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
386+
assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0));
387+
assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1));
388+
}
389+
}
371390
}

0 commit comments

Comments
 (0)