Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,12 @@ class RecordBatchWriter : public ArrayVisitor {
num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));

// Need to write 4 bytes (metadata size), the metadata, plus padding to
// fall on a 64-byte offset
int64_t padded_metadata_length =
BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4);
// fall on an 8-byte offset
int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4);

// The returned metadata size includes the length prefix, the flatbuffer,
// plus padding
*metadata_length = padded_metadata_length;
*metadata_length = static_cast<int32_t>(padded_metadata_length);

// Write the flatbuffer size prefix
int32_t flatbuffer_size = metadata_fb->size();
Expand Down Expand Up @@ -604,7 +603,9 @@ Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
return Status::Invalid(ss.str());
}

*metadata = std::make_shared<RecordBatchMetadata>(buffer, sizeof(int32_t));
std::shared_ptr<Message> message;
RETURN_NOT_OK(Message::Open(buffer, 4, &message));
*metadata = std::make_shared<RecordBatchMetadata>(message);
return Status::OK();
}

Expand Down
21 changes: 4 additions & 17 deletions cpp/src/arrow/ipc/metadata-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,23 +320,10 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
const std::vector<flatbuf::FieldNode>& nodes,
const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
flatbuffers::FlatBufferBuilder fbb;

auto batch = flatbuf::CreateRecordBatch(
fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers));

fbb.Finish(batch);

int32_t size = fbb.GetSize();

auto result = std::make_shared<PoolBuffer>();
RETURN_NOT_OK(result->Resize(size));

uint8_t* dst = result->mutable_data();
memcpy(dst, fbb.GetBufferPointer(), size);

*out = result;
return Status::OK();
MessageBuilder builder;
RETURN_NOT_OK(builder.SetRecordBatch(length, body_length, nodes, buffers));
RETURN_NOT_OK(builder.Finish());
return builder.GetBuffer(out);
}

Status MessageBuilder::Finish() {
Expand Down
5 changes: 4 additions & 1 deletion format/File.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ table Footer {

struct Block {

/// Index to the start of the RecordBlock (note this is past the Message header)
offset: long;

/// Length of the metadata
metaDataLength: int;

/// Length of the data (this is aligned so there can be a gap between this and
/// the metatdata).
bodyLength: long;

}

root_type Footer;
2 changes: 1 addition & 1 deletion integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ def get_static_json_files():


def run_all_tests(debug=False):
testers = [JavaTester(debug=debug), CPPTester(debug=debug)]
testers = [CPPTester(debug=debug), JavaTester(debug=debug)]
static_json_files = get_static_json_files()
generated_json_files = get_generated_json_files()
json_files = static_json_files + generated_json_files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ private static List<ArrowBlock> recordBatches(Footer footer) {

private static List<ArrowBlock> dictionaries(Footer footer) {
List<ArrowBlock> dictionaries = new ArrayList<>();
Block tempBLock = new Block();
Block tempBlock = new Block();

int dictionariesLength = footer.dictionariesLength();
for (int i = 0; i < dictionariesLength; i++) {
Block block = footer.dictionaries(tempBLock, i);
Block block = footer.dictionaries(tempBlock, i);
dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
}
return dictionaries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,15 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.arrow.flatbuf.Buffer;
import org.apache.arrow.flatbuf.FieldNode;
import org.apache.arrow.flatbuf.Footer;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.MessageSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ArrowBuf;

public class ArrowReader implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class);

Expand All @@ -54,15 +46,6 @@ public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) {
this.allocator = allocator;
}

private int readFully(ArrowBuf buffer, int l) throws IOException {
int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
buffer.writerIndex(n);
if (n != l) {
throw new IllegalStateException(n + " != " + l);
}
return n;
}

private int readFully(ByteBuffer buffer) throws IOException {
int total = 0;
int n;
Expand Down Expand Up @@ -104,46 +87,21 @@ public ArrowFooter readFooter() throws IOException {

// TODO: read dictionaries

public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOException {
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), recordBatchBlock.getBodyLength()));
int l = (int)(recordBatchBlock.getMetadataLength() + recordBatchBlock.getBodyLength());
if (l < 0) {
throw new InvalidArrowFileException("block invalid: " + recordBatchBlock);
}
final ArrowBuf buffer = allocator.buffer(l);
LOGGER.debug("allocated buffer " + buffer);
in.position(recordBatchBlock.getOffset());
int n = readFully(buffer, l);
if (n != l) {
throw new IllegalStateException(n + " != " + l);
}

// Record batch flatbuffer is prefixed by its size as int32le
final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4);
RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer());

int nodesLength = recordBatchFB.nodesLength();
final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
List<ArrowFieldNode> nodes = new ArrayList<>();
for (int i = 0; i < nodesLength; ++i) {
FieldNode node = recordBatchFB.nodes(i);
nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException {
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
block.getOffset(), block.getMetadataLength(),
block.getBodyLength()));
in.position(block.getOffset());
ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(
new ReadChannel(in, block.getOffset()), block, allocator);
if (batch == null) {
throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
}
List<ArrowBuf> buffers = new ArrayList<>();
for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
Buffer bufferFB = recordBatchFB.buffers(i);
LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length()));
ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length());
buffers.add(vectorBuffer);
}
ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
LOGGER.debug("released buffer " + buffer);
buffer.release();
return arrowRecordBatch;
return batch;
}

@Override
public void close() throws IOException {
in.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import java.util.Collections;
import java.util.List;

import org.apache.arrow.vector.schema.ArrowBuffer;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ArrowBuf;

public class ArrowWriter implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);

Expand All @@ -39,7 +37,6 @@ public class ArrowWriter implements AutoCloseable {
private final Schema schema;

private final List<ArrowBlock> recordBatches = new ArrayList<>();

private boolean started = false;

public ArrowWriter(WritableByteChannel out, Schema schema) {
Expand All @@ -49,47 +46,19 @@ public ArrowWriter(WritableByteChannel out, Schema schema) {

private void start() throws IOException {
writeMagic();
MessageSerializer.serialize(out, schema);
}


// TODO: write dictionaries

public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
checkStarted();
out.align();
ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch);
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
batchDesc.getOffset(), batchDesc.getMetadataLength(), batchDesc.getBodyLength()));

// write metadata header with int32 size prefix
long offset = out.getCurrentPosition();
out.write(recordBatch, true);
out.align();
// write body
long bodyOffset = out.getCurrentPosition();
List<ArrowBuf> buffers = recordBatch.getBuffers();
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
if (buffers.size() != buffersLayout.size()) {
throw new IllegalStateException("the layout does not match: " + buffers.size() + " != " + buffersLayout.size());
}
for (int i = 0; i < buffers.size(); i++) {
ArrowBuf buffer = buffers.get(i);
ArrowBuffer layout = buffersLayout.get(i);
long startPosition = bodyOffset + layout.getOffset();
if (startPosition != out.getCurrentPosition()) {
out.writeZeros((int)(startPosition - out.getCurrentPosition()));
}

out.write(buffer);
if (out.getCurrentPosition() != startPosition + layout.getSize()) {
throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() + " != " + startPosition + layout.getSize());
}
}
int metadataLength = (int)(bodyOffset - offset);
if (metadataLength <= 0) {
throw new InvalidArrowFileException("invalid recordBatch");
}
long bodyLength = out.getCurrentPosition() - bodyOffset;
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength));
// add metadata to footer
recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength));
recordBatches.add(batchDesc);
}

private void checkStarted() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ public class ReadChannel implements AutoCloseable {

private ReadableByteChannel in;
private long bytesRead = 0;
// The starting byte offset into 'in'.
private final long startByteOffset;

public ReadChannel(ReadableByteChannel in) {
public ReadChannel(ReadableByteChannel in, long startByteOffset) {
this.in = in;
this.startByteOffset = startByteOffset;
}

public ReadChannel(ReadableByteChannel in) {
this(in, 0);
}

public long bytesRead() { return bytesRead; }
Expand Down Expand Up @@ -65,6 +72,8 @@ public int readFully(ArrowBuf buffer, int l) throws IOException {
return n;
}

public long getCurrentPositiion() { return startByteOffset + bytesRead; }

@Override
public void close() throws IOException {
if (this.in != null) {
Expand Down
Loading