Skip to content

Commit 664d5be

Browse files
committed
Fixes, stream tests pass again
Change-Id: I2571b4ec6b753a4e207c7dbbd2059b7c2bfc0be2
1 parent ba8db91 commit 664d5be

File tree

1 file changed

+22
-17
lines changed

1 file changed

+22
-17
lines changed

java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,20 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch)
104104
ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.RecordBatch,
105105
batchOffset, bodyLength);
106106

107-
long metadataStart = out.getCurrentPosition();
108-
out.writeIntLittleEndian(serializedMessage.remaining());
107+
int metadataLength = serializedMessage.remaining();
108+
109+
// Add extra padding bytes so that length prefix + metadata is a multiple
110+
// of 8 after alignment
111+
if ((metadataLength + 4) % 8 != 0) {
112+
metadataLength += 8 - (metadataLength + 4) % 8;
113+
}
114+
115+
out.writeIntLittleEndian(metadataLength);
109116
out.write(serializedMessage);
110117

111118
// Align the output to 8 byte boundary.
112119
out.align();
113120

114-
long metadataSize = out.getCurrentPosition() - metadataStart;
115-
116121
long bufferStart = out.getCurrentPosition();
117122
List<ArrowBuf> buffers = batch.getBuffers();
118123
List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
@@ -130,7 +135,8 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch)
130135
" != " + startPosition + layout.getSize());
131136
}
132137
}
133-
return new ArrowBlock(start, (int) metadataSize, out.getCurrentPosition() - bufferStart);
138+
// Metadata size in the Block account for the size prefix
139+
return new ArrowBlock(start, metadataLength + 4, out.getCurrentPosition() - bufferStart);
134140
}
135141

136142
/**
@@ -165,7 +171,7 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock
165171
BufferAllocator alloc) throws IOException {
166172
long readPosition = in.getCurrentPositiion();
167173

168-
// Metadata length contains byte padding
174+
// Metadata length contains integer prefix plus byte padding
169175
long totalLen = block.getMetadataLength() + block.getBodyLength();
170176

171177
if (totalLen > Integer.MAX_VALUE) {
@@ -177,22 +183,21 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock
177183
throw new IOException("Unexpected end of input trying to read batch.");
178184
}
179185

180-
return deserializeRecordBatch(buffer, block.getMetadataLength(), (int) totalLen);
181-
}
186+
ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4);
182187

183-
// Deserializes a record batch. Buffer should start at the RecordBatch and include
184-
// all the bytes for the metadata and then data buffers.
185-
private static ArrowRecordBatch deserializeRecordBatch(ArrowBuf buffer, int metadataLen,
186-
int bufferLen) {
187188
// Read the metadata.
188189
RecordBatch recordBatchFB =
189-
RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer());
190-
191-
int bufferOffset = metadataLen;
190+
RecordBatch.getRootAsRecordBatch(metadataBuffer.nioBuffer().asReadOnlyBuffer());
192191

193192
// Now read the body
194-
final ArrowBuf body = buffer.slice(bufferOffset, bufferLen - bufferOffset);
195-
return deserializeRecordBatch(recordBatchFB, body);
193+
final ArrowBuf body = buffer.slice(block.getMetadataLength(),
194+
(int) totalLen - block.getMetadataLength());
195+
ArrowRecordBatch result = deserializeRecordBatch(recordBatchFB, body);
196+
197+
metadataBuffer.release();
198+
buffer.release();
199+
200+
return result;
196201
}
197202

198203
// Deserializes a record batch given the Flatbuffer metadata and in-memory body

0 commit comments

Comments
 (0)