Skip to content

Commit e44ede8

Browse files
elahrvivazwesm
authored andcommitted
ARROW-1343: [Java] Aligning serialized schema, end of buffers in RecordBatches
Author: Emilio Lahr-Vivaz <elahrvivaz@ccri.com> Author: Wes McKinney <wes.mckinney@twosigma.com> Closes #954 from elahrvivaz/align_end and squashes the following commits: 79ac120 [Wes McKinney] Revert to NDEBUG because it's a standard define in release builds in MSVC ae6bc9f [Wes McKinney] Use __declspec(noreturn) in MSVC. Not sure why this suddenly showed up 74b29cc [Wes McKinney] Add notes to IPC.md to make alignment contract more clear e2f0114 [Wes McKinney] Add C++ DCHECKs on read path for aligned buffers, aligned file block offset, lengths 3d64c9f [Wes McKinney] Align stream schema message in C++, DCHECKs for FileBlocks 4778ee1 [Emilio Lahr-Vivaz] adding padding to magic bytes in file format 5342915 [Emilio Lahr-Vivaz] using asserts instead of padding checks, adding padding to ArrowRecordBatch.calculateBodySize, moving align to writeBufferBatches a12b4ff [Emilio Lahr-Vivaz] comments 0b32265 [Emilio Lahr-Vivaz] aligning schema write 26bbc25 [Emilio Lahr-Vivaz] Merge branch 'ARROW-1340' into align_end a307779 [Emilio Lahr-Vivaz] ARROW-1340: [Java] Fix NullableMapVector field metadata b2bf86d [Emilio Lahr-Vivaz] WIP for aligning end of buffers
1 parent 86154f0 commit e44ede8

File tree

12 files changed

+118
-37
lines changed

12 files changed

+118
-37
lines changed

cpp/src/arrow/ipc/metadata.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "arrow/status.h"
3737
#include "arrow/tensor.h"
3838
#include "arrow/type.h"
39+
#include "arrow/util/logging.h"
3940

4041
namespace arrow {
4142

@@ -773,6 +774,20 @@ Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dicti
773774
flatbuffers::Offset<flatbuf::Schema> fb_schema;
774775
RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
775776

777+
#ifndef NDEBUG
778+
for (size_t i = 0; i < dictionaries.size(); ++i) {
779+
DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].offset)) << i;
780+
DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].metadata_length)) << i;
781+
DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].body_length)) << i;
782+
}
783+
784+
for (size_t i = 0; i < record_batches.size(); ++i) {
785+
DCHECK(BitUtil::IsMultipleOf8(record_batches[i].offset)) << i;
786+
DCHECK(BitUtil::IsMultipleOf8(record_batches[i].metadata_length)) << i;
787+
DCHECK(BitUtil::IsMultipleOf8(record_batches[i].body_length)) << i;
788+
}
789+
#endif
790+
776791
auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
777792
auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
778793

cpp/src/arrow/ipc/reader.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "arrow/table.h"
3535
#include "arrow/tensor.h"
3636
#include "arrow/type.h"
37+
#include "arrow/util/bit-util.h"
3738
#include "arrow/util/logging.h"
3839
#include "arrow/visitor_inline.h"
3940

@@ -59,6 +60,9 @@ class IpcComponentSource {
5960
*out = nullptr;
6061
return Status::OK();
6162
} else {
63+
DCHECK(BitUtil::IsMultipleOf8(buffer->offset()))
64+
<< "Buffer " << buffer_index
65+
<< " did not start on 8-byte aligned offset: " << buffer->offset();
6266
return file_->ReadAt(buffer->offset(), buffer->length(), out);
6367
}
6468
}
@@ -550,6 +554,10 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
550554
DCHECK_LT(i, num_record_batches());
551555
FileBlock block = record_batch(i);
552556

557+
DCHECK(BitUtil::IsMultipleOf8(block.offset));
558+
DCHECK(BitUtil::IsMultipleOf8(block.metadata_length));
559+
DCHECK(BitUtil::IsMultipleOf8(block.body_length));
560+
553561
std::unique_ptr<Message> message;
554562
RETURN_NOT_OK(
555563
ReadMessage(block.offset, block.metadata_length, file_.get(), &message));
@@ -564,6 +572,11 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
564572
// Read all the dictionaries
565573
for (int i = 0; i < num_dictionaries(); ++i) {
566574
FileBlock block = dictionary(i);
575+
576+
DCHECK(BitUtil::IsMultipleOf8(block.offset));
577+
DCHECK(BitUtil::IsMultipleOf8(block.metadata_length));
578+
DCHECK(BitUtil::IsMultipleOf8(block.body_length));
579+
567580
std::unique_ptr<Message> message;
568581
RETURN_NOT_OK(
569582
ReadMessage(block.offset, block.metadata_length, file_.get(), &message));

cpp/src/arrow/ipc/util.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
namespace arrow {
2828
namespace ipc {
2929

30-
// Align on 8-byte boundaries
3130
// Buffers are padded to 64-byte boundaries (for SIMD)
3231
static constexpr int kArrowAlignment = 64;
3332

33+
// Align on 8-byte boundaries in IPC
34+
static constexpr int kArrowIpcAlignment = 8;
35+
3436
static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
3537

3638
static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {

cpp/src/arrow/ipc/writer.cc

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class RecordBatchSerializer : public ArrayVisitor {
156156
// The buffer might be null if we are handling zero row lengths.
157157
if (buffer) {
158158
size = buffer->size();
159-
padding = BitUtil::RoundUpToMultipleOf64(size) - size;
159+
padding = BitUtil::RoundUpToMultipleOf8(size) - size;
160160
}
161161

162162
// TODO(wesm): We currently have no notion of shared memory page id's,
@@ -172,7 +172,7 @@ class RecordBatchSerializer : public ArrayVisitor {
172172
}
173173

174174
*body_length = offset - buffer_start_offset_;
175-
DCHECK(BitUtil::IsMultipleOf64(*body_length));
175+
DCHECK(BitUtil::IsMultipleOf8(*body_length));
176176

177177
return Status::OK();
178178
}
@@ -216,7 +216,7 @@ class RecordBatchSerializer : public ArrayVisitor {
216216
// The buffer might be null if we are handling zero row lengths.
217217
if (buffer) {
218218
size = buffer->size();
219-
padding = BitUtil::RoundUpToMultipleOf64(size) - size;
219+
padding = BitUtil::RoundUpToMultipleOf8(size) - size;
220220
}
221221

222222
if (size > 0) {
@@ -251,7 +251,7 @@ class RecordBatchSerializer : public ArrayVisitor {
251251

252252
// Send padding if it's available
253253
const int64_t buffer_length =
254-
std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
254+
std::min(BitUtil::RoundUpToMultipleOf8(array.length() * type_width),
255255
data->size() - byte_offset);
256256
data = SliceBuffer(data, byte_offset, buffer_length);
257257
}
@@ -618,15 +618,7 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
618618
}
619619

620620
virtual Status Start() {
621-
std::shared_ptr<Buffer> schema_fb;
622-
RETURN_NOT_OK(WriteSchemaMessage(*schema_, &dictionary_memo_, &schema_fb));
623-
624-
int32_t flatbuffer_size = static_cast<int32_t>(schema_fb->size());
625-
RETURN_NOT_OK(
626-
Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
627-
628-
// Write the flatbuffer
629-
RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size));
621+
RETURN_NOT_OK(WriteSchema());
630622

631623
// If there are any dictionaries, write them as the next messages
632624
RETURN_NOT_OK(WriteDictionaries());
@@ -635,6 +627,17 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
635627
return Status::OK();
636628
}
637629

630+
Status WriteSchema() {
631+
std::shared_ptr<Buffer> schema_fb;
632+
RETURN_NOT_OK(WriteSchemaMessage(*schema_, &dictionary_memo_, &schema_fb));
633+
634+
int32_t metadata_length = 0;
635+
RETURN_NOT_OK(WriteMessage(*schema_fb, sink_, &metadata_length));
636+
RETURN_NOT_OK(UpdatePosition());
637+
DCHECK_EQ(0, position_ % 8) << "WriteSchema did not perform an aligned write";
638+
return Status::OK();
639+
}
640+
638641
virtual Status Close() {
639642
// Write the schema if not already written
640643
// User is responsible for closing the OutputStream
@@ -701,9 +704,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
701704
&record_batches_[record_batches_.size() - 1]);
702705
}
703706

704-
// Adds padding bytes if necessary to ensure all memory blocks are written on
705-
// 64-byte (or other alignment) boundaries.
706-
Status Align(int64_t alignment = kArrowAlignment) {
707+
Status Align(int64_t alignment = kArrowIpcAlignment) {
708+
// Adds padding bytes if necessary to ensure all memory blocks are written on
709+
// 8-byte (or other alignment) boundaries.
707710
int64_t remainder = PaddedLength(position_, alignment) - position_;
708711
if (remainder > 0) {
709712
return Write(kPaddingBytes, remainder);
@@ -774,7 +777,7 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl
774777
// It is only necessary to align to 8-byte boundary at the start of the file
775778
RETURN_NOT_OK(Write(reinterpret_cast<const uint8_t*>(kArrowMagicBytes),
776779
strlen(kArrowMagicBytes)));
777-
RETURN_NOT_OK(Align(8));
780+
RETURN_NOT_OK(Align());
778781

779782
// We write the schema at the start of the file (and the end). This also
780783
// writes all the dictionaries at the beginning of the file

cpp/src/arrow/util/bit-util.h

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,20 +217,28 @@ static inline uint32_t RoundUpNumi64(uint32_t bits) { return (bits + 63) >> 6; }
217217
/// Returns the rounded down to 64 multiple.
218218
static inline uint32_t RoundDownNumi64(uint32_t bits) { return bits >> 6; }
219219

220-
static inline int64_t RoundUpToMultipleOf64(int64_t num) {
220+
template <int64_t ROUND_TO>
221+
static inline int64_t RoundToPowerOfTwo(int64_t num) {
221222
// TODO(wesm): is this definitely needed?
222223
// DCHECK_GE(num, 0);
223-
constexpr int64_t round_to = 64;
224-
constexpr int64_t force_carry_addend = round_to - 1;
225-
constexpr int64_t truncate_bitmask = ~(round_to - 1);
226-
constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
224+
constexpr int64_t force_carry_addend = ROUND_TO - 1;
225+
constexpr int64_t truncate_bitmask = ~(ROUND_TO - 1);
226+
constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - ROUND_TO;
227227
if (num <= max_roundable_num) {
228228
return (num + force_carry_addend) & truncate_bitmask;
229229
}
230230
// handle overflow case. This should result in a malloc error upstream
231231
return num;
232232
}
233233

234+
static inline int64_t RoundUpToMultipleOf64(int64_t num) {
235+
return RoundToPowerOfTwo<64>(num);
236+
}
237+
238+
static inline int64_t RoundUpToMultipleOf8(int64_t num) {
239+
return RoundToPowerOfTwo<8>(num);
240+
}
241+
234242
/// Non hw accelerated pop count.
235243
/// TODO: we don't use this in any perf sensitive code paths currently. There
236244
/// might be a much faster way to implement this.

cpp/src/arrow/util/logging.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <cstdlib>
2222
#include <iostream>
2323

24+
#include "arrow/util/macros.h"
25+
2426
namespace arrow {
2527

2628
// Stubbed versions of macros defined in glog/logging.h, intended for
@@ -127,9 +129,9 @@ class CerrLog {
127129
class FatalLog : public CerrLog {
128130
public:
129131
explicit FatalLog(int /* severity */) // NOLINT
130-
: CerrLog(ARROW_FATAL){} // NOLINT
132+
: CerrLog(ARROW_FATAL) {} // NOLINT
131133

132-
[[noreturn]] ~FatalLog() {
134+
ARROW_NORETURN ~FatalLog() {
133135
if (has_logged_) {
134136
std::cerr << std::endl;
135137
}

cpp/src/arrow/util/macros.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@
3636
#if defined(__GNUC__)
3737
#define ARROW_PREDICT_FALSE(x) (__builtin_expect(x, 0))
3838
#define ARROW_PREDICT_TRUE(x) (__builtin_expect(!!(x), 1))
39+
#define ARROW_NORETURN __attribute__((noreturn))
40+
#elif defined(_MSC_VER)
41+
#define ARROW_NORETURN __declspec(noreturn)
42+
#define ARROW_PREDICT_FALSE(x) x
43+
#define ARROW_PREDICT_TRUE(x) x
3944
#else
45+
#define ARROW_NORETURN
4046
#define ARROW_PREDICT_FALSE(x) x
4147
#define ARROW_PREDICT_TRUE(x) x
4248
#endif

format/IPC.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Data components in the stream and file formats are represented as encapsulated
2727
* A length prefix indicating the metadata size
2828
* The message metadata as a [Flatbuffer][3]
2929
* Padding bytes to an 8-byte boundary
30-
* The message body
30+
* The message body, which must be a multiple of 8 bytes
3131

3232
Schematically, we have:
3333

@@ -38,6 +38,10 @@ Schematically, we have:
3838
<message body>
3939
```
4040

41+
The complete serialized message must be a multiple of 8 bytes so that messages
42+
can be relocated between streams. Otherwise the amount of padding between the
43+
metadata and the message body could be non-deterministic.
44+
4145
The `metadata_size` includes the size of the flatbuffer plus padding. The
4246
`Message` flatbuffer includes a version number, the particular message (as a
4347
flatbuffer union), and the size of the message body:
@@ -154,6 +158,10 @@ struct Block {
154158
}
155159
```
156160

161+
The `metaDataLength` here includes the metadata length prefix, serialized
162+
metadata, and any additional padding bytes, and by construction must be a
163+
multiple of 8 bytes.
164+
157165
Some notes about this
158166

159167
* The `Block` offset indicates the starting byte of the record batch.

java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, Writa
3838

3939
@Override
4040
protected void startInternal(WriteChannel out) throws IOException {
41-
ArrowMagic.writeMagic(out);
41+
ArrowMagic.writeMagic(out, true);
4242
}
4343

4444
@Override
@@ -54,7 +54,7 @@ protected void endInternal(WriteChannel out,
5454
}
5555
out.writeIntLittleEndian(footerLength);
5656
LOGGER.debug(String.format("Footer starts at %d, length: %d", footerStart, footerLength));
57-
ArrowMagic.writeMagic(out);
57+
ArrowMagic.writeMagic(out, false);
5858
LOGGER.debug(String.format("magic written, now at %d", out.getCurrentPosition()));
5959
}
6060
}

java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@ public class ArrowMagic {
2828

2929
public static final int MAGIC_LENGTH = MAGIC.length;
3030

31-
public static void writeMagic(WriteChannel out) throws IOException {
31+
public static void writeMagic(WriteChannel out, boolean align) throws IOException {
3232
out.write(MAGIC);
33+
if (align) {
34+
out.align();
35+
}
3336
}
3437

3538
public static boolean validateMagic(byte[] array) {

0 commit comments

Comments
 (0)