Skip to content

Commit edf36e7

Browse files
committed
Start drafting FileReader IPC implementation. Change record batch data header to write metadata size int32_t as suffix rather than prefix
Change-Id: I769a8ad47747f6a885701cab08d53c5a0c73c492
1 parent 95157f2 commit edf36e7

File tree

9 files changed

+142
-46
lines changed

9 files changed

+142
-46
lines changed

cpp/src/arrow/ipc/adapter.cc

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ class RecordBatchWriter {
199199

200200
// Write the data header at the end
201201
RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));
202-
*data_header_offset = position;
202+
*data_header_offset = position + data_header->size();
203203

204204
return Align(dst, &position);
205205
}
@@ -256,8 +256,6 @@ Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
256256
// ----------------------------------------------------------------------
257257
// Record batch read path
258258

259-
static constexpr int64_t INIT_METADATA_SIZE = 4096;
260-
261259
class RecordBatchReader::RecordBatchReaderImpl {
262260
public:
263261
RecordBatchReaderImpl(io::ReadableFileInterface* file,
@@ -377,29 +375,31 @@ class RecordBatchReader::RecordBatchReaderImpl {
377375
int num_flattened_fields_;
378376
};
379377

380-
Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
378+
Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
381379
std::shared_ptr<RecordBatchReader>* out) {
382-
return Open(file, position, kMaxIpcRecursionDepth, out);
380+
return Open(file, offset, kMaxIpcRecursionDepth, out);
383381
}
384382

385-
Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
383+
Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
386384
int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out) {
387-
std::shared_ptr<Buffer> metadata;
388-
RETURN_NOT_OK(file->ReadAt(position, INIT_METADATA_SIZE, &metadata));
385+
std::shared_ptr<Buffer> buffer;
386+
RETURN_NOT_OK(file->ReadAt(offset - sizeof(int32_t), sizeof(int32_t), &buffer));
389387

390-
int32_t metadata_size = *reinterpret_cast<const int32_t*>(metadata->data());
388+
int32_t metadata_size = *reinterpret_cast<const int32_t*>(buffer->data());
391389

392-
// We may not need to call ReadAt again
393-
if (metadata_size > static_cast<int>(INIT_METADATA_SIZE - sizeof(int32_t))) {
394-
// We don't have enough data, read the indicated metadata size.
395-
RETURN_NOT_OK(file->ReadAt(position + sizeof(int32_t), metadata_size, &metadata));
390+
if (metadata_size + static_cast<int>(sizeof(int32_t)) > offset) {
391+
return Status::Invalid("metadata size invalid");
396392
}
397393

394+
// Read the metadata
395+
RETURN_NOT_OK(
396+
file->ReadAt(offset - metadata_size - sizeof(int32_t), metadata_size, &buffer));
397+
398398
// TODO(wesm): buffer slicing here would be better in case ReadAt returns
399399
// allocated memory
400400

401401
std::shared_ptr<Message> message;
402-
RETURN_NOT_OK(Message::Open(metadata, &message));
402+
RETURN_NOT_OK(Message::Open(buffer, &message));
403403

404404
if (message->type() != Message::RECORD_BATCH) {
405405
return Status::Invalid("Metadata message is not a record batch");

cpp/src/arrow/ipc/adapter.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ constexpr int kMaxIpcRecursionDepth = 64;
6262
// <int32: metadata size> <uint8*: metadata>
6363
//
6464
// Finally, the absolute offset (relative to the start of the output stream) to
65-
// the start of the metadata / data header is returned in an out-variable
65+
// the end of the metadata / data header (suffixed by the header size) is
66+
// returned in an out-variable
6667
ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
6768
int32_t num_rows, io::OutputStream* dst, int64_t* header_offset,
6869
int max_recursion_depth = kMaxIpcRecursionDepth);
@@ -79,10 +80,12 @@ ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size);
7980

8081
class ARROW_EXPORT RecordBatchReader {
8182
public:
82-
static Status Open(io::ReadableFileInterface* file, int64_t position,
83+
// The offset is the absolute position to the *end* of the record batch data
84+
// header
85+
static Status Open(io::ReadableFileInterface* file, int64_t offset,
8386
std::shared_ptr<RecordBatchReader>* out);
8487

85-
static Status Open(io::ReadableFileInterface* file, int64_t position,
88+
static Status Open(io::ReadableFileInterface* file, int64_t offset,
8689
int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out);
8790

8891
virtual ~RecordBatchReader();

cpp/src/arrow/ipc/file.cc

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,5 +126,86 @@ Status FileWriter::Close() {
126126
// ----------------------------------------------------------------------
127127
// Reader implementation
128128

129+
FileReader::FileReader(
130+
const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset)
131+
: file_(file), footer_offset_(footer_offset) {}
132+
133+
FileReader::~FileReader() {}
134+
135+
Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
136+
std::shared_ptr<FileReader>* reader) {
137+
int64_t footer_offset;
138+
RETURN_NOT_OK(file->GetSize(&footer_offset));
139+
return Open(file, footer_offset, reader);
140+
}
141+
142+
Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
143+
int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
144+
*reader = std::shared_ptr<FileReader>(new FileReader(file, footer_offset));
145+
return Status::OK();
146+
}
147+
148+
Status FileReader::ReadFooter() {
149+
int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
150+
151+
if (footer_offset_ <= magic_size * 2 + 4) {
152+
std::stringstream ss;
153+
ss << "File is too small: " << footer_offset_;
154+
return Status::Invalid(ss.str());
155+
}
156+
157+
RETURN_NOT_OK(file_->Seek(footer_offset_));
158+
159+
std::shared_ptr<Buffer> buffer;
160+
161+
int file_end_size = magic_size + sizeof(int32_t);
162+
163+
RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, &buffer));
164+
165+
if (memcmp(buffer->data(), kArrowMagicBytes, magic_size)) {
166+
return Status::Invalid("Not an Arrow file");
167+
}
168+
169+
int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data());
170+
171+
if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > footer_offset_) {
172+
return Status::Invalid("File is smaller than indicated metadata size");
173+
}
174+
175+
// Now read the footer
176+
RETURN_NOT_OK(file_->ReadAt(
177+
footer_offset_ - footer_length - file_end_size, footer_length, &buffer));
178+
RETURN_NOT_OK(FileFooter::Open(buffer, &footer_));
179+
180+
return Status::OK();
181+
}
182+
183+
Status FileReader::GetSchema(std::shared_ptr<Schema>* schema) const {
184+
return footer_->GetSchema(schema);
185+
}
186+
187+
int FileReader::num_dictionaries() const {
188+
return footer_->num_dictionaries();
189+
}
190+
191+
int FileReader::num_record_batches() const {
192+
return footer_->num_record_batches();
193+
}
194+
195+
MetadataVersion::type FileReader::version() const {
196+
return footer_->version();
197+
}
198+
199+
Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
200+
DCHECK_GT(i, 0);
201+
DCHECK_LT(i, num_record_batches());
202+
// FileBlock block = footer_->record_batch(i);
203+
204+
//
205+
std::shared_ptr<Buffer> buffer;
206+
// RETURN_NOT_OK(file_->ReadAt
207+
return Status::OK();
208+
}
209+
129210
} // namespace ipc
130211
} // namespace arrow

cpp/src/arrow/ipc/file.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,15 @@ class ARROW_EXPORT FileWriter {
8686

8787
class ARROW_EXPORT FileReader {
8888
public:
89+
~FileReader();
90+
8991
// Open a file-like object that is assumed to be self-contained; i.e., the
9092
// end of the file interface is the end of the Arrow file. Note that there
9193
// can be any amount of data preceding the Arrow-formatted data, because we
9294
// need only locate the end of the Arrow file stream to discover the metadata
9395
// and then proceed to read the data into memory.
94-
static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file);
96+
static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
97+
std::shared_ptr<FileReader>* reader);
9598

9699
// If the file is embedded within some larger file or memory region, you can
97100
// pass the absolute memory offset to the end of the file (which contains the
@@ -100,12 +103,12 @@ class ARROW_EXPORT FileReader {
100103
//
101104
// @param file: the data source
102105
// @param footer_offset: the position of the end of the Arrow "file"
103-
static Status Open(
104-
const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset);
106+
static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
107+
int64_t footer_offset, std::shared_ptr<FileReader>* reader);
105108

106109
// The Arrow schema shared by all of the record batches
107110
// @param schema (out): arrow::Schema
108-
Status GetSchema(std::shared_ptr<Schema>* schema);
111+
Status GetSchema(std::shared_ptr<Schema>* schema) const;
109112

110113
// Shared dictionaries for dictionary-encoding cross record batches
111114
// TODO(wesm): Implement dictionary reading when we also have dictionary
@@ -114,6 +117,8 @@ class ARROW_EXPORT FileReader {
114117

115118
int num_record_batches() const;
116119

120+
MetadataVersion::type version() const;
121+
117122
// Read a record batch from the file. Does not copy memory if the input
118123
// source supports zero-copy.
119124
//
@@ -122,13 +127,18 @@ class ARROW_EXPORT FileReader {
122127
Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
123128

124129
private:
125-
FileReader(const std::shared_ptr<io::ReadableFileInterface>& file);
130+
FileReader(
131+
const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset);
132+
133+
Status ReadFooter();
126134

127135
std::shared_ptr<io::ReadableFileInterface> file_;
128136

129137
// The location where the Arrow file layout ends. May be the end of the file
130138
// or some other location if embedded in a larger file.
131139
int64_t footer_offset_;
140+
141+
std::unique_ptr<FileFooter> footer_;
132142
};
133143

134144
} // namespace ipc

cpp/src/arrow/ipc/ipc-adapter-test.cc

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,13 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
6060
std::shared_ptr<RecordBatch>* batch_result) {
6161
std::string path = "test-write-row-batch";
6262
io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
63-
int64_t header_location;
63+
int64_t header_offset;
6464

65-
RETURN_NOT_OK(WriteRecordBatch(
66-
batch.columns(), batch.num_rows(), mmap_.get(), &header_location));
65+
RETURN_NOT_OK(
66+
WriteRecordBatch(batch.columns(), batch.num_rows(), mmap_.get(), &header_offset));
6767

6868
std::shared_ptr<RecordBatchReader> reader;
69-
RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_location, &reader));
69+
RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader));
7070

7171
RETURN_NOT_OK(reader->GetRecordBatch(batch.schema(), batch_result));
7272
return Status::OK();
@@ -281,10 +281,10 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch,
281281

282282
void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
283283
ipc::MockOutputStream mock;
284-
int64_t mock_header_location = -1;
284+
int64_t mock_header_offset = -1;
285285
int64_t size = -1;
286-
ASSERT_OK(WriteRecordBatch(
287-
batch->columns(), batch->num_rows(), &mock, &mock_header_location));
286+
ASSERT_OK(
287+
WriteRecordBatch(batch->columns(), batch->num_rows(), &mock, &mock_header_offset));
288288
ASSERT_OK(GetRecordBatchSize(batch.get(), &size));
289289
ASSERT_EQ(mock.GetExtentBytesWritten(), size);
290290
}
@@ -335,8 +335,8 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
335335
std::string path = "test-write-past-max-recursion";
336336
const int memory_map_size = 1 << 16;
337337
io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
338-
int64_t header_location;
339-
int64_t* header_out_param = header_out == nullptr ? &header_location : header_out;
338+
int64_t header_offset;
339+
int64_t* header_out_param = header_out == nullptr ? &header_offset : header_out;
340340
if (override_level) {
341341
return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
342342
header_out_param, recursion_level + 1);
@@ -356,12 +356,12 @@ TEST_F(RecursionLimits, WriteLimit) {
356356
}
357357

358358
TEST_F(RecursionLimits, ReadLimit) {
359-
int64_t header_location = -1;
359+
int64_t header_offset = -1;
360360
std::shared_ptr<Schema> schema;
361-
ASSERT_OK(WriteToMmap(64, true, &header_location, &schema));
361+
ASSERT_OK(WriteToMmap(64, true, &header_offset, &schema));
362362

363363
std::shared_ptr<RecordBatchReader> reader;
364-
ASSERT_OK(RecordBatchReader::Open(mmap_.get(), header_location, &reader));
364+
ASSERT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader));
365365
std::shared_ptr<RecordBatch> batch_result;
366366
ASSERT_RAISES(Invalid, reader->GetRecordBatch(schema, &batch_result));
367367
}

cpp/src/arrow/ipc/ipc-metadata-test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class TestFileFooter : public ::testing::Test {
108108

109109
ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream));
110110

111-
std::shared_ptr<FileFooter> footer;
111+
std::unique_ptr<FileFooter> footer;
112112
ASSERT_OK(FileFooter::Open(buffer, &footer));
113113

114114
ASSERT_EQ(MetadataVersion::V1_SNAPSHOT, footer->version());

cpp/src/arrow/ipc/metadata-internal.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,17 +304,17 @@ Status MessageBuilder::Finish() {
304304
}
305305

306306
Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) {
307-
// The message buffer is prefixed by the size of the complete flatbuffer as
307+
// The message buffer is suffixed by the size of the complete flatbuffer as
308308
// int32_t
309-
// <int32_t: flatbuffer size><uint8_t*: flatbuffer data>
309+
// <uint8_t*: flatbuffer data><int32_t: flatbuffer size>
310310
int32_t size = fbb_.GetSize();
311311

312312
auto result = std::make_shared<PoolBuffer>();
313313
RETURN_NOT_OK(result->Resize(size + sizeof(int32_t)));
314314

315315
uint8_t* dst = result->mutable_data();
316-
memcpy(dst, reinterpret_cast<int32_t*>(&size), sizeof(int32_t));
317-
memcpy(dst + sizeof(int32_t), fbb_.GetBufferPointer(), size);
316+
memcpy(dst, fbb_.GetBufferPointer(), size);
317+
memcpy(dst + size, reinterpret_cast<int32_t*>(&size), sizeof(int32_t));
318318

319319
*out = result;
320320
return Status::OK();

cpp/src/arrow/ipc/metadata.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,7 @@ Status Message::Open(
8484
const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out) {
8585
std::shared_ptr<Message> result(new Message());
8686

87-
// The buffer is prefixed by its size as int32_t
88-
const uint8_t* fb_head = buffer->data() + sizeof(int32_t);
89-
const flatbuf::Message* message = flatbuf::GetMessage(fb_head);
87+
const flatbuf::Message* message = flatbuf::GetMessage(buffer->data());
9088

9189
// TODO(wesm): verify message
9290
result->impl_.reset(new MessageImpl(buffer, message));
@@ -300,11 +298,13 @@ class FileFooter::FileFooterImpl {
300298

301299
FileFooter::FileFooter() {}
302300

301+
FileFooter::~FileFooter() {}
302+
303303
Status FileFooter::Open(
304-
const std::shared_ptr<Buffer>& buffer, std::shared_ptr<FileFooter>* out) {
304+
const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
305305
const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());
306306

307-
*out = std::shared_ptr<FileFooter>(new FileFooter());
307+
*out = std::unique_ptr<FileFooter>(new FileFooter());
308308

309309
// TODO(wesm): Verify the footer
310310
(*out)->impl_.reset(new FileFooterImpl(buffer, footer));
@@ -318,7 +318,7 @@ int FileFooter::num_dictionaries() const {
318318

319319
int FileFooter::num_record_batches() const {
320320
return impl_->num_record_batches();
321-
};
321+
}
322322

323323
MetadataVersion::type FileFooter::version() const {
324324
return impl_->version();

cpp/src/arrow/ipc/metadata.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,10 @@ Status WriteFileFooter(const Schema* schema, const std::vector<FileBlock>& dicti
163163

164164
class ARROW_EXPORT FileFooter {
165165
public:
166+
~FileFooter();
167+
166168
static Status Open(
167-
const std::shared_ptr<Buffer>& buffer, std::shared_ptr<FileFooter>* out);
169+
const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out);
168170

169171
int num_dictionaries() const;
170172
int num_record_batches() const;

0 commit comments

Comments
 (0)