@@ -95,7 +95,7 @@ static bool IsListType(const DataType* type) {
9595}
9696
9797// ----------------------------------------------------------------------
98- // Row batch write path
98+ // Record batch write path
9999
100100Status VisitArray (const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes,
101101 std::vector<std::shared_ptr<Buffer>>* buffers, int max_recursion_depth) {
@@ -132,28 +132,32 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes
132132 return Status::OK ();
133133}
134134
135- class RowBatchWriter {
135+ class RecordBatchWriter {
136136 public:
137- RowBatchWriter (const RowBatch* batch, int max_recursion_depth)
138- : batch_(batch), max_recursion_depth_(max_recursion_depth) {}
137+ RecordBatchWriter (const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows,
138+ int max_recursion_depth)
139+ : columns_(&columns),
140+ num_rows_ (num_rows),
141+ max_recursion_depth_(max_recursion_depth) {}
139142
140143 Status AssemblePayload () {
141144 // Perform depth-first traversal of the row-batch
142- for (int i = 0 ; i < batch_-> num_columns (); ++i) {
143- const Array* arr = batch_-> column (i) .get ();
145+ for (size_t i = 0 ; i < columns_-> size (); ++i) {
146+ const Array* arr = (*columns_)[i] .get ();
144147 RETURN_NOT_OK (VisitArray (arr, &field_nodes_, &buffers_, max_recursion_depth_));
145148 }
146149 return Status::OK ();
147150 }
148151
149- Status Write (io::OutputStream* dst, int64_t * data_header_offset) {
150- // Write out all the buffers contiguously and compute the total size of the
151- // memory payload
152- int64_t offset = 0 ;
153-
152+ Status Write (
153+ io::OutputStream* dst, int64_t * body_end_offset, int64_t * header_end_offset) {
154154 // Get the starting position
155- int64_t position;
156- RETURN_NOT_OK (dst->Tell (&position));
155+ int64_t start_position;
156+ RETURN_NOT_OK (dst->Tell (&start_position));
157+
158+ // Keep track of the current position so we can determine the size of the
159+ // message body
160+ int64_t position = start_position;
157161
158162 for (size_t i = 0 ; i < buffers_.size (); ++i) {
159163 const Buffer* buffer = buffers_[i].get ();
@@ -175,14 +179,16 @@ class RowBatchWriter {
175179 // are using from any OS-level shared memory. The thought is that systems
176180 // may (in the future) associate integer page id's with physical memory
177181 // pages (according to whatever is the desired shared memory mechanism)
178- buffer_meta_.push_back (flatbuf::Buffer (0 , position + offset , size));
182+ buffer_meta_.push_back (flatbuf::Buffer (0 , position, size));
179183
180184 if (size > 0 ) {
181185 RETURN_NOT_OK (dst->Write (buffer->data (), size));
182- offset += size;
186+ position += size;
183187 }
184188 }
185189
190+ *body_end_offset = position;
191+
186192 // Now that we have computed the locations of all of the buffers in shared
187193 // memory, the data header can be converted to a flatbuffer and written out
188194 //
@@ -192,65 +198,81 @@ class RowBatchWriter {
192198 // construct the flatbuffer data accessor object (see arrow::ipc::Message)
193199 std::shared_ptr<Buffer> data_header;
194200 RETURN_NOT_OK (WriteDataHeader (
195- batch_-> num_rows (), offset , field_nodes_, buffer_meta_, &data_header));
201+ num_rows_, position - start_position , field_nodes_, buffer_meta_, &data_header));
196202
197203 // Write the data header at the end
198204 RETURN_NOT_OK (dst->Write (data_header->data (), data_header->size ()));
199205
200- *data_header_offset = position + offset;
206+ position += data_header->size ();
207+ *header_end_offset = position;
208+
209+ return Align (dst, &position);
210+ }
211+
212+ Status Align (io::OutputStream* dst, int64_t * position) {
213+ // Write all buffers here on word boundaries
214+ // TODO(wesm): Is there benefit to 64-byte padding in IPC?
215+ int64_t remainder = PaddedLength (*position) - *position;
216+ if (remainder > 0 ) {
217+ RETURN_NOT_OK (dst->Write (kPaddingBytes , remainder));
218+ *position += remainder;
219+ }
201220 return Status::OK ();
202221 }
203222
204223 // This must be called after invoking AssemblePayload
205224 Status GetTotalSize (int64_t * size) {
206225 // emulates the behavior of Write without actually writing
226+ int64_t body_offset;
207227 int64_t data_header_offset;
208228 MockOutputStream dst;
209- RETURN_NOT_OK (Write (&dst, &data_header_offset));
229+ RETURN_NOT_OK (Write (&dst, &body_offset, & data_header_offset));
210230 *size = dst.GetExtentBytesWritten ();
211231 return Status::OK ();
212232 }
213233
214234 private:
215- const RowBatch* batch_;
235+ // Do not copy this vector. Ownership must be retained elsewhere
236+ const std::vector<std::shared_ptr<Array>>* columns_;
237+ int32_t num_rows_;
216238
217239 std::vector<flatbuf::FieldNode> field_nodes_;
218240 std::vector<flatbuf::Buffer> buffer_meta_;
219241 std::vector<std::shared_ptr<Buffer>> buffers_;
220242 int max_recursion_depth_;
221243};
222244
223- Status WriteRowBatch (io::OutputStream* dst, const RowBatch* batch, int64_t * header_offset,
224- int max_recursion_depth) {
245+ Status WriteRecordBatch (const std::vector<std::shared_ptr<Array>>& columns,
246+ int32_t num_rows, io::OutputStream* dst, int64_t * body_end_offset,
247+ int64_t * header_end_offset, int max_recursion_depth) {
225248 DCHECK_GT (max_recursion_depth, 0 );
226- RowBatchWriter serializer (batch , max_recursion_depth);
249+ RecordBatchWriter serializer (columns, num_rows , max_recursion_depth);
227250 RETURN_NOT_OK (serializer.AssemblePayload ());
228- return serializer.Write (dst, header_offset );
251+ return serializer.Write (dst, body_end_offset, header_end_offset );
229252}
230253
231- Status GetRowBatchSize (const RowBatch* batch, int64_t * size) {
232- RowBatchWriter serializer (batch, kMaxIpcRecursionDepth );
254+ Status GetRecordBatchSize (const RecordBatch* batch, int64_t * size) {
255+ RecordBatchWriter serializer (
256+ batch->columns (), batch->num_rows (), kMaxIpcRecursionDepth );
233257 RETURN_NOT_OK (serializer.AssemblePayload ());
234258 RETURN_NOT_OK (serializer.GetTotalSize (size));
235259 return Status::OK ();
236260}
237261
238262// ----------------------------------------------------------------------
239- // Row batch read path
263+ // Record batch read path
240264
241- static constexpr int64_t INIT_METADATA_SIZE = 4096 ;
242-
243- class RowBatchReader ::RowBatchReaderImpl {
265+ class RecordBatchReader ::RecordBatchReaderImpl {
244266 public:
245- RowBatchReaderImpl (io::ReadableFileInterface* file,
267+ RecordBatchReaderImpl (io::ReadableFileInterface* file,
246268 const std::shared_ptr<RecordBatchMessage>& metadata, int max_recursion_depth)
247269 : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) {
248270 num_buffers_ = metadata->num_buffers ();
249271 num_flattened_fields_ = metadata->num_fields ();
250272 }
251273
252274 Status AssembleBatch (
253- const std::shared_ptr<Schema>& schema, std::shared_ptr<RowBatch >* out) {
275+ const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch >* out) {
254276 std::vector<std::shared_ptr<Array>> arrays (schema->num_fields ());
255277
256278 // The field_index and buffer_index are incremented in NextArray based on
@@ -263,7 +285,7 @@ class RowBatchReader::RowBatchReaderImpl {
263285 RETURN_NOT_OK (NextArray (field, max_recursion_depth_, &arrays[i]));
264286 }
265287
266- *out = std::make_shared<RowBatch >(schema, metadata_->length (), arrays);
288+ *out = std::make_shared<RecordBatch >(schema, metadata_->length (), arrays);
267289 return Status::OK ();
268290 }
269291
@@ -359,49 +381,51 @@ class RowBatchReader::RowBatchReaderImpl {
359381 int num_flattened_fields_;
360382};
361383
362- Status RowBatchReader ::Open (io::ReadableFileInterface* file, int64_t position ,
363- std::shared_ptr<RowBatchReader >* out) {
364- return Open (file, position , kMaxIpcRecursionDepth , out);
384+ Status RecordBatchReader ::Open (io::ReadableFileInterface* file, int64_t offset ,
385+ std::shared_ptr<RecordBatchReader >* out) {
386+ return Open (file, offset , kMaxIpcRecursionDepth , out);
365387}
366388
367- Status RowBatchReader ::Open (io::ReadableFileInterface* file, int64_t position ,
368- int max_recursion_depth, std::shared_ptr<RowBatchReader >* out) {
369- std::shared_ptr<Buffer> metadata ;
370- RETURN_NOT_OK (file->ReadAt (position, INIT_METADATA_SIZE , &metadata ));
389+ Status RecordBatchReader ::Open (io::ReadableFileInterface* file, int64_t offset ,
390+ int max_recursion_depth, std::shared_ptr<RecordBatchReader >* out) {
391+ std::shared_ptr<Buffer> buffer ;
392+ RETURN_NOT_OK (file->ReadAt (offset - sizeof ( int32_t ), sizeof ( int32_t ) , &buffer ));
371393
372- int32_t metadata_size = *reinterpret_cast <const int32_t *>(metadata ->data ());
394+ int32_t metadata_size = *reinterpret_cast <const int32_t *>(buffer ->data ());
373395
374- // We may not need to call ReadAt again
375- if (metadata_size > static_cast <int >(INIT_METADATA_SIZE - sizeof (int32_t ))) {
376- // We don't have enough data, read the indicated metadata size.
377- RETURN_NOT_OK (file->ReadAt (position + sizeof (int32_t ), metadata_size, &metadata));
396+ if (metadata_size + static_cast <int >(sizeof (int32_t )) > offset) {
397+ return Status::Invalid (" metadata size invalid" );
378398 }
379399
400+ // Read the metadata
401+ RETURN_NOT_OK (
402+ file->ReadAt (offset - metadata_size - sizeof (int32_t ), metadata_size, &buffer));
403+
380404 // TODO(wesm): buffer slicing here would be better in case ReadAt returns
381405 // allocated memory
382406
383407 std::shared_ptr<Message> message;
384- RETURN_NOT_OK (Message::Open (metadata , &message));
408+ RETURN_NOT_OK (Message::Open (buffer , &message));
385409
386410 if (message->type () != Message::RECORD_BATCH) {
387411 return Status::Invalid (" Metadata message is not a record batch" );
388412 }
389413
390414 std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch ();
391415
392- std::shared_ptr<RowBatchReader > result (new RowBatchReader ());
393- result->impl_ .reset (new RowBatchReaderImpl (file, batch_meta, max_recursion_depth));
416+ std::shared_ptr<RecordBatchReader > result (new RecordBatchReader ());
417+ result->impl_ .reset (new RecordBatchReaderImpl (file, batch_meta, max_recursion_depth));
394418 *out = result;
395419
396420 return Status::OK ();
397421}
398422
399423// Here the explicit destructor is required for compilers to be aware of
400- // the complete information of RowBatchReader::RowBatchReaderImpl class
401- RowBatchReader ::~RowBatchReader () {}
424+ // the complete information of RecordBatchReader::RecordBatchReaderImpl class
425+ RecordBatchReader ::~RecordBatchReader () {}
402426
403- Status RowBatchReader::GetRowBatch (
404- const std::shared_ptr<Schema>& schema, std::shared_ptr<RowBatch >* out) {
427+ Status RecordBatchReader::GetRecordBatch (
428+ const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch >* out) {
405429 return impl_->AssembleBatch (schema, out);
406430}
407431
0 commit comments