Skip to content

Commit 8ca7033

Browse files
committed
ARROW-499: Update file serialization to use the streaming serialization format.
Author: Wes McKinney <wes.mckinney@twosigma.com> Author: Nong Li <nongli@gmail.com> Closes apache#292 from nongli/file and squashes the following commits: 18890a9 [Wes McKinney] Message fixes. Fix Java test suite. Integration tests pass f187539 [Nong Li] Merge pull request #1 from wesm/file-change-cpp-impl e3af434 [Wes McKinney] Remove unused variable 664d5be [Wes McKinney] Fixes, stream tests pass again ba8db91 [Wes McKinney] Redo MessageSerializer with unions. Still has bugs 21854cc [Wes McKinney] Restore Block.bodyLength to long 7c6f7ef [Nong Li] Update to restore Block behavior 27b3909 [Nong Li] [ARROW-499]: [Java] Update file serialization to use the streaming serialization format.
1 parent 512bc16 commit 8ca7033

File tree

12 files changed

+174
-185
lines changed

12 files changed

+174
-185
lines changed

cpp/src/arrow/ipc/adapter.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,12 @@ class RecordBatchWriter : public ArrayVisitor {
129129
num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));
130130

131131
// Need to write 4 bytes (metadata size), the metadata, plus padding to
132-
// fall on a 64-byte offset
133-
int64_t padded_metadata_length =
134-
BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4);
132+
// fall on an 8-byte offset
133+
int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4);
135134

136135
// The returned metadata size includes the length prefix, the flatbuffer,
137136
// plus padding
138-
*metadata_length = padded_metadata_length;
137+
*metadata_length = static_cast<int32_t>(padded_metadata_length);
139138

140139
// Write the flatbuffer size prefix
141140
int32_t flatbuffer_size = metadata_fb->size();
@@ -604,7 +603,9 @@ Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
604603
return Status::Invalid(ss.str());
605604
}
606605

607-
*metadata = std::make_shared<RecordBatchMetadata>(buffer, sizeof(int32_t));
606+
std::shared_ptr<Message> message;
607+
RETURN_NOT_OK(Message::Open(buffer, 4, &message));
608+
*metadata = std::make_shared<RecordBatchMetadata>(message);
608609
return Status::OK();
609610
}
610611

cpp/src/arrow/ipc/metadata-internal.cc

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -320,23 +320,10 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
320320
Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
321321
const std::vector<flatbuf::FieldNode>& nodes,
322322
const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
323-
flatbuffers::FlatBufferBuilder fbb;
324-
325-
auto batch = flatbuf::CreateRecordBatch(
326-
fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers));
327-
328-
fbb.Finish(batch);
329-
330-
int32_t size = fbb.GetSize();
331-
332-
auto result = std::make_shared<PoolBuffer>();
333-
RETURN_NOT_OK(result->Resize(size));
334-
335-
uint8_t* dst = result->mutable_data();
336-
memcpy(dst, fbb.GetBufferPointer(), size);
337-
338-
*out = result;
339-
return Status::OK();
323+
MessageBuilder builder;
324+
RETURN_NOT_OK(builder.SetRecordBatch(length, body_length, nodes, buffers));
325+
RETURN_NOT_OK(builder.Finish());
326+
return builder.GetBuffer(out);
340327
}
341328

342329
Status MessageBuilder::Finish() {

format/File.fbs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,15 @@ table Footer {
3535

3636
struct Block {
3737

38+
/// Index to the start of the RecordBlock (note this is past the Message header)
3839
offset: long;
3940

41+
/// Length of the metadata
4042
metaDataLength: int;
4143

44+
/// Length of the data (this is aligned so there can be a gap between this and
45+
/// the metatdata).
4246
bodyLength: long;
43-
4447
}
4548

4649
root_type Footer;

integration/integration_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ def get_static_json_files():
648648

649649

650650
def run_all_tests(debug=False):
651-
testers = [JavaTester(debug=debug), CPPTester(debug=debug)]
651+
testers = [CPPTester(debug=debug), JavaTester(debug=debug)]
652652
static_json_files = get_static_json_files()
653653
generated_json_files = get_generated_json_files()
654654
json_files = static_json_files + generated_json_files

java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@ private static List<ArrowBlock> recordBatches(Footer footer) {
6565

6666
private static List<ArrowBlock> dictionaries(Footer footer) {
6767
List<ArrowBlock> dictionaries = new ArrayList<>();
68-
Block tempBLock = new Block();
68+
Block tempBlock = new Block();
69+
6970
int dictionariesLength = footer.dictionariesLength();
7071
for (int i = 0; i < dictionariesLength; i++) {
71-
Block block = footer.dictionaries(tempBLock, i);
72+
Block block = footer.dictionaries(tempBlock, i);
7273
dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
7374
}
7475
return dictionaries;

java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java

Lines changed: 11 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,15 @@
2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
2222
import java.nio.channels.SeekableByteChannel;
23-
import java.util.ArrayList;
2423
import java.util.Arrays;
25-
import java.util.List;
2624

27-
import org.apache.arrow.flatbuf.Buffer;
28-
import org.apache.arrow.flatbuf.FieldNode;
2925
import org.apache.arrow.flatbuf.Footer;
30-
import org.apache.arrow.flatbuf.RecordBatch;
3126
import org.apache.arrow.memory.BufferAllocator;
32-
import org.apache.arrow.vector.schema.ArrowFieldNode;
3327
import org.apache.arrow.vector.schema.ArrowRecordBatch;
3428
import org.apache.arrow.vector.stream.MessageSerializer;
3529
import org.slf4j.Logger;
3630
import org.slf4j.LoggerFactory;
3731

38-
import io.netty.buffer.ArrowBuf;
39-
4032
public class ArrowReader implements AutoCloseable {
4133
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class);
4234

@@ -54,15 +46,6 @@ public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) {
5446
this.allocator = allocator;
5547
}
5648

57-
private int readFully(ArrowBuf buffer, int l) throws IOException {
58-
int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
59-
buffer.writerIndex(n);
60-
if (n != l) {
61-
throw new IllegalStateException(n + " != " + l);
62-
}
63-
return n;
64-
}
65-
6649
private int readFully(ByteBuffer buffer) throws IOException {
6750
int total = 0;
6851
int n;
@@ -104,46 +87,21 @@ public ArrowFooter readFooter() throws IOException {
10487

10588
// TODO: read dictionaries
10689

107-
public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOException {
108-
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), recordBatchBlock.getBodyLength()));
109-
int l = (int)(recordBatchBlock.getMetadataLength() + recordBatchBlock.getBodyLength());
110-
if (l < 0) {
111-
throw new InvalidArrowFileException("block invalid: " + recordBatchBlock);
112-
}
113-
final ArrowBuf buffer = allocator.buffer(l);
114-
LOGGER.debug("allocated buffer " + buffer);
115-
in.position(recordBatchBlock.getOffset());
116-
int n = readFully(buffer, l);
117-
if (n != l) {
118-
throw new IllegalStateException(n + " != " + l);
119-
}
120-
121-
// Record batch flatbuffer is prefixed by its size as int32le
122-
final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4);
123-
RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer());
124-
125-
int nodesLength = recordBatchFB.nodesLength();
126-
final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
127-
List<ArrowFieldNode> nodes = new ArrayList<>();
128-
for (int i = 0; i < nodesLength; ++i) {
129-
FieldNode node = recordBatchFB.nodes(i);
130-
nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
90+
public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException {
91+
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
92+
block.getOffset(), block.getMetadataLength(),
93+
block.getBodyLength()));
94+
in.position(block.getOffset());
95+
ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(
96+
new ReadChannel(in, block.getOffset()), block, allocator);
97+
if (batch == null) {
98+
throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
13199
}
132-
List<ArrowBuf> buffers = new ArrayList<>();
133-
for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
134-
Buffer bufferFB = recordBatchFB.buffers(i);
135-
LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length()));
136-
ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length());
137-
buffers.add(vectorBuffer);
138-
}
139-
ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
140-
LOGGER.debug("released buffer " + buffer);
141-
buffer.release();
142-
return arrowRecordBatch;
100+
return batch;
143101
}
144102

103+
@Override
145104
public void close() throws IOException {
146105
in.close();
147106
}
148-
149107
}

java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@
2323
import java.util.Collections;
2424
import java.util.List;
2525

26-
import org.apache.arrow.vector.schema.ArrowBuffer;
2726
import org.apache.arrow.vector.schema.ArrowRecordBatch;
27+
import org.apache.arrow.vector.stream.MessageSerializer;
2828
import org.apache.arrow.vector.types.pojo.Schema;
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131

32-
import io.netty.buffer.ArrowBuf;
33-
3432
public class ArrowWriter implements AutoCloseable {
3533
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);
3634

@@ -39,7 +37,6 @@ public class ArrowWriter implements AutoCloseable {
3937
private final Schema schema;
4038

4139
private final List<ArrowBlock> recordBatches = new ArrayList<>();
42-
4340
private boolean started = false;
4441

4542
public ArrowWriter(WritableByteChannel out, Schema schema) {
@@ -49,47 +46,19 @@ public ArrowWriter(WritableByteChannel out, Schema schema) {
4946

5047
private void start() throws IOException {
5148
writeMagic();
49+
MessageSerializer.serialize(out, schema);
5250
}
5351

54-
5552
// TODO: write dictionaries
5653

5754
public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
5855
checkStarted();
59-
out.align();
56+
ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch);
57+
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
58+
batchDesc.getOffset(), batchDesc.getMetadataLength(), batchDesc.getBodyLength()));
6059

61-
// write metadata header with int32 size prefix
62-
long offset = out.getCurrentPosition();
63-
out.write(recordBatch, true);
64-
out.align();
65-
// write body
66-
long bodyOffset = out.getCurrentPosition();
67-
List<ArrowBuf> buffers = recordBatch.getBuffers();
68-
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
69-
if (buffers.size() != buffersLayout.size()) {
70-
throw new IllegalStateException("the layout does not match: " + buffers.size() + " != " + buffersLayout.size());
71-
}
72-
for (int i = 0; i < buffers.size(); i++) {
73-
ArrowBuf buffer = buffers.get(i);
74-
ArrowBuffer layout = buffersLayout.get(i);
75-
long startPosition = bodyOffset + layout.getOffset();
76-
if (startPosition != out.getCurrentPosition()) {
77-
out.writeZeros((int)(startPosition - out.getCurrentPosition()));
78-
}
79-
80-
out.write(buffer);
81-
if (out.getCurrentPosition() != startPosition + layout.getSize()) {
82-
throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() + " != " + startPosition + layout.getSize());
83-
}
84-
}
85-
int metadataLength = (int)(bodyOffset - offset);
86-
if (metadataLength <= 0) {
87-
throw new InvalidArrowFileException("invalid recordBatch");
88-
}
89-
long bodyLength = out.getCurrentPosition() - bodyOffset;
90-
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength));
9160
// add metadata to footer
92-
recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength));
61+
recordBatches.add(batchDesc);
9362
}
9463

9564
private void checkStarted() throws IOException {

java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,16 @@ public class ReadChannel implements AutoCloseable {
3232

3333
private ReadableByteChannel in;
3434
private long bytesRead = 0;
35+
// The starting byte offset into 'in'.
36+
private final long startByteOffset;
3537

36-
public ReadChannel(ReadableByteChannel in) {
38+
public ReadChannel(ReadableByteChannel in, long startByteOffset) {
3739
this.in = in;
40+
this.startByteOffset = startByteOffset;
41+
}
42+
43+
public ReadChannel(ReadableByteChannel in) {
44+
this(in, 0);
3845
}
3946

4047
public long bytesRead() { return bytesRead; }
@@ -65,6 +72,8 @@ public int readFully(ArrowBuf buffer, int l) throws IOException {
6572
return n;
6673
}
6774

75+
public long getCurrentPositiion() { return startByteOffset + bytesRead; }
76+
6877
@Override
6978
public void close() throws IOException {
7079
if (this.in != null) {

0 commit comments

Comments
 (0)