File tree Expand file tree Collapse file tree 2 files changed +15
-36
lines changed
main/java/org/apache/kafka/common/serialization
test/java/org/apache/kafka/common/serialization Expand file tree Collapse file tree 2 files changed +15
-36
lines changed Original file line number Diff line number Diff line change 16
16
*/
17
17
package org .apache .kafka .common .serialization ;
18
18
19
- import org .apache .kafka .common .utils .Utils ;
20
-
21
19
import java .nio .ByteBuffer ;
22
20
23
21
/**
24
- * Do not need to flip before call <i>serialize(String, ByteBuffer)</i>. For example:
25
- *
26
- * <blockquote>
27
- * <pre>
28
- * ByteBufferSerializer serializer = ...; // Create Serializer
29
- * ByteBuffer buffer = ...; // Allocate ByteBuffer
30
- * buffer.put(data); // Put data into buffer, do not need to flip
31
- * serializer.serialize(topic, buffer); // Serialize buffer
32
- * </pre>
33
- * </blockquote>
22
+ * {@code ByteBufferSerializer} always {@link ByteBuffer#rewind() rewinds} the position of the input buffer to zero for
23
+ * serialization. A manual rewind is not necessary.
24
+ * <p>
25
+ * Note: any existing buffer position is ignored.
26
+ * <p>
27
+ * The position is also rewound back to zero before {@link #serialize(String, ByteBuffer)}
28
+ * returns.
34
29
*/
35
30
public class ByteBufferSerializer implements Serializer <ByteBuffer > {
36
-
37
- @ Override
38
31
public byte [] serialize (String topic , ByteBuffer data ) {
39
- if (data == null ) {
32
+ if (data == null )
40
33
return null ;
41
- }
34
+
35
+ data .rewind ();
42
36
43
37
if (data .hasArray ()) {
44
- final byte [] arr = data .array ();
38
+ byte [] arr = data .array ();
45
39
if (data .arrayOffset () == 0 && arr .length == data .remaining ()) {
46
40
return arr ;
47
41
}
48
42
}
49
43
50
- data .flip ();
51
- return Utils .toArray (data );
44
+ byte [] ret = new byte [data .remaining ()];
45
+ data .get (ret , 0 , ret .length );
46
+ data .rewind ();
47
+ return ret ;
52
48
}
53
49
}
Original file line number Diff line number Diff line change @@ -406,23 +406,6 @@ private Serde<String> getStringSerde(String encoder) {
406
406
return Serdes .serdeFrom (serializer , deserializer );
407
407
}
408
408
409
- @ Test
410
- public void testByteBufferSerializer () {
411
- final byte [] bytes = "Hello" .getBytes (UTF_8 );
412
- final ByteBuffer heapBuffer0 = ByteBuffer .allocate (bytes .length + 1 ).put (bytes );
413
- final ByteBuffer heapBuffer1 = ByteBuffer .allocate (bytes .length ).put (bytes );
414
- final ByteBuffer heapBuffer2 = ByteBuffer .wrap (bytes );
415
- final ByteBuffer directBuffer0 = ByteBuffer .allocateDirect (bytes .length + 1 ).put (bytes );
416
- final ByteBuffer directBuffer1 = ByteBuffer .allocateDirect (bytes .length ).put (bytes );
417
- try (final ByteBufferSerializer serializer = new ByteBufferSerializer ()) {
418
- assertArrayEquals (bytes , serializer .serialize (topic , heapBuffer0 ));
419
- assertArrayEquals (bytes , serializer .serialize (topic , heapBuffer1 ));
420
- assertArrayEquals (bytes , serializer .serialize (topic , heapBuffer2 ));
421
- assertArrayEquals (bytes , serializer .serialize (topic , directBuffer0 ));
422
- assertArrayEquals (bytes , serializer .serialize (topic , directBuffer1 ));
423
- }
424
- }
425
-
426
409
@ ParameterizedTest
427
410
@ ValueSource (booleans = { true , false })
428
411
public void testBooleanSerializer (Boolean dataToSerialize ) {
You can’t perform that action at this time.
0 commit comments