Skip to content

Commit b88bce0

Browse files
committed
Finish draft of FileReader::GetRecordBatch. Add body end offset to ipc adapter
Change-Id: Ibf6fd0c4afd0704cdbc7a6501179c9ac47c9d33d
1 parent edf36e7 commit b88bce0

File tree

5 files changed

+50
-38
lines changed

5 files changed

+50
-38
lines changed

cpp/src/arrow/ipc/adapter.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ class RecordBatchWriter {
149149
return Status::OK();
150150
}
151151

152-
Status Write(io::OutputStream* dst, int64_t* data_header_offset) {
152+
Status Write(
153+
io::OutputStream* dst, int64_t* body_end_offset, int64_t* header_end_offset) {
153154
// Get the starting position
154155
int64_t start_position;
155156
RETURN_NOT_OK(dst->Tell(&start_position));
@@ -186,6 +187,8 @@ class RecordBatchWriter {
186187
}
187188
}
188189

190+
*body_end_offset = position;
191+
189192
// Now that we have computed the locations of all of the buffers in shared
190193
// memory, the data header can be converted to a flatbuffer and written out
191194
//
@@ -199,7 +202,7 @@ class RecordBatchWriter {
199202

200203
// Write the data header at the end
201204
RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));
202-
*data_header_offset = position + data_header->size();
205+
*header_end_offset = position + data_header->size();
203206

204207
return Align(dst, &position);
205208
}
@@ -218,9 +221,10 @@ class RecordBatchWriter {
218221
// This must be called after invoking AssemblePayload
219222
Status GetTotalSize(int64_t* size) {
220223
// emulates the behavior of Write without actually writing
224+
int64_t body_offset;
221225
int64_t data_header_offset;
222226
MockOutputStream dst;
223-
RETURN_NOT_OK(Write(&dst, &data_header_offset));
227+
RETURN_NOT_OK(Write(&dst, &body_offset, &data_header_offset));
224228
*size = dst.GetExtentBytesWritten();
225229
return Status::OK();
226230
}
@@ -237,12 +241,12 @@ class RecordBatchWriter {
237241
};
238242

239243
Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
240-
int32_t num_rows, io::OutputStream* dst, int64_t* header_offset,
241-
int max_recursion_depth) {
244+
int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
245+
int64_t* header_end_offset, int max_recursion_depth) {
242246
DCHECK_GT(max_recursion_depth, 0);
243247
RecordBatchWriter serializer(columns, num_rows, max_recursion_depth);
244248
RETURN_NOT_OK(serializer.AssemblePayload());
245-
return serializer.Write(dst, header_offset);
249+
return serializer.Write(dst, body_end_offset, header_end_offset);
246250
}
247251

248252
Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {

cpp/src/arrow/ipc/adapter.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ constexpr int kMaxIpcRecursionDepth = 64;
6161
//
6262
// <int32: metadata size> <uint8*: metadata>
6363
//
64-
// Finally, the absolute offset (relative to the start of the output stream) to
65-
// the end of the metadata / data header (suffixed by the header size) is
66-
// returned in an out-variable
64+
// Finally, the absolute offsets (relative to the start of the output stream)
65+
// to the end of the body and end of the metadata / data header (suffixed by
66+
// the header size) is returned in out-variables
6767
ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
68-
int32_t num_rows, io::OutputStream* dst, int64_t* header_offset,
69-
int max_recursion_depth = kMaxIpcRecursionDepth);
68+
int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
69+
int64_t* header_end_offset, int max_recursion_depth = kMaxIpcRecursionDepth);
7070

7171
// int64_t GetRecordBatchMetadata(const RecordBatch* batch);
7272

cpp/src/arrow/ipc/file.cc

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,18 @@ Status FileWriter::WriteRecordBatch(
8787

8888
int64_t offset = position_;
8989

90-
int64_t header_offset;
91-
RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(columns, num_rows, sink_, &header_offset));
90+
int64_t body_end_offset;
91+
int64_t header_end_offset;
92+
RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
93+
columns, num_rows, sink_, &body_end_offset, &header_end_offset));
9294
RETURN_NOT_OK(UpdatePosition());
9395

9496
DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes";
9597

96-
// We can infer the metadata length and length of the record batch body (the
97-
// concatenated buffers) from the heade offset and the new output stream
98-
// position
99-
int32_t metadata_length = position_ - header_offset;
100-
int32_t body_length = position_ - offset - metadata_length;
98+
// There may be padding ever the end of the metadata, so we cannot rely on
99+
// position_
100+
int32_t metadata_length = body_end_offset - header_end_offset;
101+
int32_t body_length = body_end_offset - offset;
101102

102103
// Append metadata, to be written in the footer latera
103104
record_batches_.emplace_back(offset, metadata_length, body_length);
@@ -177,11 +178,12 @@ Status FileReader::ReadFooter() {
177178
footer_offset_ - footer_length - file_end_size, footer_length, &buffer));
178179
RETURN_NOT_OK(FileFooter::Open(buffer, &footer_));
179180

180-
return Status::OK();
181+
// Get the schema
182+
return footer_->GetSchema(&schema_);
181183
}
182184

183-
Status FileReader::GetSchema(std::shared_ptr<Schema>* schema) const {
184-
return footer_->GetSchema(schema);
185+
const std::shared_ptr<Schema>& FileReader::schema() const {
186+
return schema_;
185187
}
186188

187189
int FileReader::num_dictionaries() const {
@@ -199,12 +201,13 @@ MetadataVersion::type FileReader::version() const {
199201
Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
200202
DCHECK_GT(i, 0);
201203
DCHECK_LT(i, num_record_batches());
202-
// FileBlock block = footer_->record_batch(i);
204+
FileBlock block = footer_->record_batch(i);
205+
int64_t metadata_end_offset = block.offset + block.body_length + block.metadata_length;
203206

204-
//
205-
std::shared_ptr<Buffer> buffer;
206-
// RETURN_NOT_OK(file_->ReadAt
207-
return Status::OK();
207+
std::shared_ptr<RecordBatchReader> reader;
208+
RETURN_NOT_OK(RecordBatchReader::Open(file_.get(), metadata_end_offset, &reader));
209+
210+
return reader->GetRecordBatch(schema_, batch);
208211
}
209212

210213
} // namespace ipc

cpp/src/arrow/ipc/file.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,7 @@ class ARROW_EXPORT FileReader {
106106
static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
107107
int64_t footer_offset, std::shared_ptr<FileReader>* reader);
108108

109-
// The Arrow schema shared by all of the record batches
110-
// @param schema (out): arrow::Schema
111-
Status GetSchema(std::shared_ptr<Schema>* schema) const;
109+
const std::shared_ptr<Schema>& schema() const;
112110

113111
// Shared dictionaries for dictionary-encoding cross record batches
114112
// TODO(wesm): Implement dictionary reading when we also have dictionary
@@ -139,6 +137,7 @@ class ARROW_EXPORT FileReader {
139137
int64_t footer_offset_;
140138

141139
std::unique_ptr<FileFooter> footer_;
140+
std::shared_ptr<Schema> schema_;
142141
};
143142

144143
} // namespace ipc

cpp/src/arrow/ipc/ipc-adapter-test.cc

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,15 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
6060
std::shared_ptr<RecordBatch>* batch_result) {
6161
std::string path = "test-write-row-batch";
6262
io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
63-
int64_t header_offset;
6463

65-
RETURN_NOT_OK(
66-
WriteRecordBatch(batch.columns(), batch.num_rows(), mmap_.get(), &header_offset));
64+
int64_t body_end_offset;
65+
int64_t header_end_offset;
66+
67+
RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), mmap_.get(),
68+
&body_end_offset, &header_end_offset));
6769

6870
std::shared_ptr<RecordBatchReader> reader;
69-
RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader));
71+
RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_end_offset, &reader));
7072

7173
RETURN_NOT_OK(reader->GetRecordBatch(batch.schema(), batch_result));
7274
return Status::OK();
@@ -282,9 +284,10 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch,
282284
void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
283285
ipc::MockOutputStream mock;
284286
int64_t mock_header_offset = -1;
287+
int64_t mock_body_offset = -1;
285288
int64_t size = -1;
286-
ASSERT_OK(
287-
WriteRecordBatch(batch->columns(), batch->num_rows(), &mock, &mock_header_offset));
289+
ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), &mock,
290+
&mock_body_offset, &mock_header_offset));
288291
ASSERT_OK(GetRecordBatchSize(batch.get(), &size));
289292
ASSERT_EQ(mock.GetExtentBytesWritten(), size);
290293
}
@@ -335,14 +338,17 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
335338
std::string path = "test-write-past-max-recursion";
336339
const int memory_map_size = 1 << 16;
337340
io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
341+
342+
int64_t body_offset;
338343
int64_t header_offset;
344+
339345
int64_t* header_out_param = header_out == nullptr ? &header_offset : header_out;
340346
if (override_level) {
341347
return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
342-
header_out_param, recursion_level + 1);
348+
&body_offset, header_out_param, recursion_level + 1);
343349
} else {
344-
return WriteRecordBatch(
345-
batch->columns(), batch->num_rows(), mmap_.get(), header_out_param);
350+
return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
351+
&body_offset, header_out_param);
346352
}
347353
}
348354

0 commit comments

Comments
 (0)