Skip to content

Commit a2711f2

Browse files
committed
Address other format incompatibilities, write vectorLayout to Arrow metadata
Change-Id: Ic04ef893957c98ba2747f6ad9cef0e7ebe596958
1 parent 13608ef commit a2711f2

File tree

15 files changed

+199
-129
lines changed

15 files changed

+199
-129
lines changed

cpp/src/arrow/io/io-file-test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
#include <cstdio>
2020
#include <cstring>
2121
#ifndef _MSC_VER
22-
# include <fcntl.h>
22+
#include <fcntl.h>
2323
#endif
2424
#include <fstream>
2525
#include <memory>

cpp/src/arrow/ipc/adapter.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,9 +402,6 @@ class RecordBatchReader {
402402

403403
Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
404404
BufferMetadata metadata = metadata_->buffer(buffer_index);
405-
if (!BitUtil::IsMultipleOf8(metadata.length)) {
406-
return Status::Invalid("Expected buffer to be a multiple of 8 bytes");
407-
}
408405
return file_->ReadAt(metadata.offset, metadata.length, out);
409406
}
410407

cpp/src/arrow/ipc/json-internal.cc

Lines changed: 28 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ namespace ipc {
4545
using RjArray = rj::Value::ConstArray;
4646
using RjObject = rj::Value::ConstObject;
4747

48-
enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY };
49-
5048
static std::string GetBufferTypeName(BufferType type) {
5149
switch (type) {
5250
case BufferType::DATA:
@@ -93,27 +91,6 @@ static std::string GetTimeUnitName(TimeUnit unit) {
9391
return "UNKNOWN";
9492
}
9593

96-
class BufferLayout {
97-
public:
98-
BufferLayout(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {}
99-
100-
BufferType type() const { return type_; }
101-
int bit_width() const { return bit_width_; }
102-
103-
private:
104-
BufferType type_;
105-
int bit_width_;
106-
};
107-
108-
static const BufferLayout kValidityBuffer(BufferType::VALIDITY, 1);
109-
static const BufferLayout kOffsetBuffer(BufferType::OFFSET, 32);
110-
static const BufferLayout kTypeBuffer(BufferType::TYPE, 32);
111-
static const BufferLayout kBooleanBuffer(BufferType::DATA, 1);
112-
static const BufferLayout kValues64(BufferType::DATA, 64);
113-
static const BufferLayout kValues32(BufferType::DATA, 32);
114-
static const BufferLayout kValues16(BufferType::DATA, 16);
115-
static const BufferLayout kValues8(BufferType::DATA, 8);
116-
11794
class JsonSchemaWriter : public TypeVisitor {
11895
public:
11996
explicit JsonSchemaWriter(const Schema& schema, RjWriter* writer)
@@ -154,9 +131,9 @@ class JsonSchemaWriter : public TypeVisitor {
154131
}
155132

156133
template <typename T>
157-
typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value ||
158-
std::is_base_of<BooleanType, T>::value ||
159-
std::is_base_of<NullType, T>::value,
134+
typename std::enable_if<
135+
std::is_base_of<NoExtraMeta, T>::value || std::is_base_of<BooleanType, T>::value ||
136+
std::is_base_of<DateType, T>::value || std::is_base_of<NullType, T>::value,
160137
void>::type
161138
WriteTypeMetadata(const T& type) {}
162139

@@ -243,29 +220,28 @@ class JsonSchemaWriter : public TypeVisitor {
243220
}
244221

245222
template <typename T>
246-
Status WritePrimitive(const std::string& typeclass, const T& type,
247-
const std::vector<BufferLayout>& buffer_layout) {
223+
Status WritePrimitive(const std::string& typeclass, const T& type) {
248224
WriteName(typeclass, type);
249225
SetNoChildren();
250-
WriteBufferLayout(buffer_layout);
226+
WriteBufferLayout(type.GetBufferLayout());
251227
return Status::OK();
252228
}
253229

254230
template <typename T>
255231
Status WriteVarBytes(const std::string& typeclass, const T& type) {
256232
WriteName(typeclass, type);
257233
SetNoChildren();
258-
WriteBufferLayout({kValidityBuffer, kOffsetBuffer, kValues8});
234+
WriteBufferLayout(type.GetBufferLayout());
259235
return Status::OK();
260236
}
261237

262-
void WriteBufferLayout(const std::vector<BufferLayout>& buffer_layout) {
238+
void WriteBufferLayout(const std::vector<BufferDescr>& buffer_layout) {
263239
writer_->Key("typeLayout");
264240
writer_->StartObject();
265241
writer_->Key("vectors");
266242
writer_->StartArray();
267243

268-
for (const BufferLayout& buffer : buffer_layout) {
244+
for (const BufferDescr& buffer : buffer_layout) {
269245
writer_->StartObject();
270246
writer_->Key("type");
271247
writer_->String(GetBufferTypeName(buffer.type()));
@@ -289,101 +265,74 @@ class JsonSchemaWriter : public TypeVisitor {
289265
return Status::OK();
290266
}
291267

292-
Status Visit(const NullType& type) override { return WritePrimitive("null", type, {}); }
268+
Status Visit(const NullType& type) override { return WritePrimitive("null", type); }
293269

294-
Status Visit(const BooleanType& type) override {
295-
return WritePrimitive("bool", type, {kValidityBuffer, kBooleanBuffer});
296-
}
270+
Status Visit(const BooleanType& type) override { return WritePrimitive("bool", type); }
297271

298-
Status Visit(const Int8Type& type) override {
299-
return WritePrimitive("int", type, {kValidityBuffer, kValues8});
300-
}
272+
Status Visit(const Int8Type& type) override { return WritePrimitive("int", type); }
301273

302-
Status Visit(const Int16Type& type) override {
303-
return WritePrimitive("int", type, {kValidityBuffer, kValues16});
304-
}
274+
Status Visit(const Int16Type& type) override { return WritePrimitive("int", type); }
305275

306-
Status Visit(const Int32Type& type) override {
307-
return WritePrimitive("int", type, {kValidityBuffer, kValues32});
308-
}
276+
Status Visit(const Int32Type& type) override { return WritePrimitive("int", type); }
309277

310-
Status Visit(const Int64Type& type) override {
311-
return WritePrimitive("int", type, {kValidityBuffer, kValues64});
312-
}
278+
Status Visit(const Int64Type& type) override { return WritePrimitive("int", type); }
313279

314-
Status Visit(const UInt8Type& type) override {
315-
return WritePrimitive("int", type, {kValidityBuffer, kValues8});
316-
}
280+
Status Visit(const UInt8Type& type) override { return WritePrimitive("int", type); }
317281

318-
Status Visit(const UInt16Type& type) override {
319-
return WritePrimitive("int", type, {kValidityBuffer, kValues16});
320-
}
282+
Status Visit(const UInt16Type& type) override { return WritePrimitive("int", type); }
321283

322-
Status Visit(const UInt32Type& type) override {
323-
return WritePrimitive("int", type, {kValidityBuffer, kValues32});
324-
}
284+
Status Visit(const UInt32Type& type) override { return WritePrimitive("int", type); }
325285

326-
Status Visit(const UInt64Type& type) override {
327-
return WritePrimitive("int", type, {kValidityBuffer, kValues64});
328-
}
286+
Status Visit(const UInt64Type& type) override { return WritePrimitive("int", type); }
329287

330288
Status Visit(const HalfFloatType& type) override {
331-
return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues16});
289+
return WritePrimitive("floatingpoint", type);
332290
}
333291

334292
Status Visit(const FloatType& type) override {
335-
return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues32});
293+
return WritePrimitive("floatingpoint", type);
336294
}
337295

338296
Status Visit(const DoubleType& type) override {
339-
return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues64});
297+
return WritePrimitive("floatingpoint", type);
340298
}
341299

342300
Status Visit(const StringType& type) override { return WriteVarBytes("utf8", type); }
343301

344302
Status Visit(const BinaryType& type) override { return WriteVarBytes("binary", type); }
345303

346-
Status Visit(const DateType& type) override {
347-
return WritePrimitive("date", type, {kValidityBuffer, kValues64});
348-
}
304+
Status Visit(const DateType& type) override { return WritePrimitive("date", type); }
349305

350-
Status Visit(const TimeType& type) override {
351-
return WritePrimitive("time", type, {kValidityBuffer, kValues64});
352-
}
306+
Status Visit(const TimeType& type) override { return WritePrimitive("time", type); }
353307

354308
Status Visit(const TimestampType& type) override {
355-
return WritePrimitive("timestamp", type, {kValidityBuffer, kValues64});
309+
return WritePrimitive("timestamp", type);
356310
}
357311

358312
Status Visit(const IntervalType& type) override {
359-
return WritePrimitive("interval", type, {kValidityBuffer, kValues64});
313+
return WritePrimitive("interval", type);
360314
}
361315

362316
Status Visit(const DecimalType& type) override { return Status::NotImplemented("NYI"); }
363317

364318
Status Visit(const ListType& type) override {
365319
WriteName("list", type);
366320
RETURN_NOT_OK(WriteChildren(type.children()));
367-
WriteBufferLayout({kValidityBuffer, kOffsetBuffer});
321+
WriteBufferLayout(type.GetBufferLayout());
368322
return Status::OK();
369323
}
370324

371325
Status Visit(const StructType& type) override {
372326
WriteName("struct", type);
373327
WriteChildren(type.children());
374-
WriteBufferLayout({kValidityBuffer, kTypeBuffer});
328+
WriteBufferLayout(type.GetBufferLayout());
375329
return Status::OK();
376330
}
377331

378332
Status Visit(const UnionType& type) override {
379333
WriteName("union", type);
380334
WriteChildren(type.children());
381-
382-
if (type.mode == UnionMode::SPARSE) {
383-
WriteBufferLayout({kValidityBuffer, kTypeBuffer});
384-
} else {
385-
WriteBufferLayout({kValidityBuffer, kTypeBuffer, kOffsetBuffer});
386-
}
335+
WriteBufferLayout(type.GetBufferLayout());
387336
return Status::OK();
388337
}
389338

cpp/src/arrow/ipc/metadata-internal.cc

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,32 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type
152152
break;
153153

154154
static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
155-
std::vector<FieldOffset>* children, flatbuf::Type* out_type, Offset* offset) {
155+
std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
156+
flatbuf::Type* out_type, Offset* offset) {
157+
std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
158+
for (const BufferDescr& descr : buffer_layout) {
159+
flatbuf::VectorType vector_type;
160+
switch (descr.type()) {
161+
case BufferType::OFFSET:
162+
vector_type = flatbuf::VectorType_OFFSET;
163+
break;
164+
case BufferType::DATA:
165+
vector_type = flatbuf::VectorType_DATA;
166+
break;
167+
case BufferType::VALIDITY:
168+
vector_type = flatbuf::VectorType_VALIDITY;
169+
break;
170+
case BufferType::TYPE:
171+
vector_type = flatbuf::VectorType_TYPE;
172+
break;
173+
default:
174+
vector_type = flatbuf::VectorType_DATA;
175+
break;
176+
}
177+
auto offset = flatbuf::CreateVectorLayout(fbb, descr.bit_width(), vector_type);
178+
layout->push_back(offset);
179+
}
180+
156181
switch (type->type) {
157182
case Type::BOOL:
158183
*out_type = flatbuf::Type_Bool;
@@ -211,14 +236,18 @@ static Status FieldToFlatbuffer(
211236

212237
flatbuf::Type type_enum;
213238
Offset type_data;
239+
Offset type_layout;
214240
std::vector<FieldOffset> children;
241+
std::vector<VectorLayoutOffset> layout;
215242

216-
RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data));
243+
RETURN_NOT_OK(
244+
TypeToFlatbuffer(fbb, field->type, &children, &layout, &type_enum, &type_data));
217245
auto fb_children = fbb.CreateVector(children);
246+
auto fb_layout = fbb.CreateVector(layout);
218247

219248
// TODO: produce the list of VectorTypes
220249
*offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data,
221-
field->dictionary, fb_children);
250+
field->dictionary, fb_children, fb_layout);
222251

223252
return Status::OK();
224253
}
@@ -293,8 +322,8 @@ Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
293322
const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
294323
flatbuffers::FlatBufferBuilder fbb;
295324

296-
auto batch = flatbuf::CreateRecordBatch(fbb, length, fbb.CreateVectorOfStructs(nodes),
297-
fbb.CreateVectorOfStructs(buffers));
325+
auto batch = flatbuf::CreateRecordBatch(
326+
fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers));
298327

299328
fbb.Finish(batch);
300329

cpp/src/arrow/ipc/metadata-internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ namespace ipc {
4141

4242
using FBB = flatbuffers::FlatBufferBuilder;
4343
using FieldOffset = flatbuffers::Offset<arrow::flatbuf::Field>;
44+
using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>;
4445
using Offset = flatbuffers::Offset<void>;
4546

4647
static constexpr flatbuf::MetadataVersion kMetadataVersion =

cpp/src/arrow/ipc/metadata.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,13 @@ RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message
184184
impl_.reset(new RecordBatchMetadataImpl(message->impl_->header()));
185185
}
186186

187-
RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Buffer>& buffer, int64_t offset) {
187+
RecordBatchMetadata::RecordBatchMetadata(
188+
const std::shared_ptr<Buffer>& buffer, int64_t offset) {
188189
message_ = nullptr;
189190
buffer_ = buffer;
190191

191-
const flatbuf::RecordBatch* metadata = flatbuffers::GetRoot<flatbuf::RecordBatch>(
192-
buffer->data() + offset);
192+
const flatbuf::RecordBatch* metadata =
193+
flatbuffers::GetRoot<flatbuf::RecordBatch>(buffer->data() + offset);
193194

194195
// TODO(wesm): validate table
195196

cpp/src/arrow/test-util.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@
6161

6262
// Alias MSVC popcount to GCC name
6363
#ifdef _MSC_VER
64-
# include <intrin.h>
65-
# define __builtin_popcount __popcnt
66-
# include <nmmintrin.h>
67-
# define __builtin_popcountll _mm_popcnt_u64
64+
#include <intrin.h>
65+
#define __builtin_popcount __popcnt
66+
#include <nmmintrin.h>
67+
#define __builtin_popcountll _mm_popcnt_u64
6868
#endif
6969

7070
namespace arrow {

cpp/src/arrow/type.cc

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,6 @@ std::string UnionType::ToString() const {
105105
return s.str();
106106
}
107107

108-
int NullType::bit_width() const {
109-
return 0;
110-
}
111-
112108
std::string NullType::ToString() const {
113109
return name();
114110
}
@@ -187,4 +183,46 @@ std::shared_ptr<Field> field(
187183
return std::make_shared<Field>(name, type, nullable, dictionary);
188184
}
189185

186+
static const BufferDescr kValidityBuffer(BufferType::VALIDITY, 1);
187+
static const BufferDescr kOffsetBuffer(BufferType::OFFSET, 32);
188+
static const BufferDescr kTypeBuffer(BufferType::TYPE, 32);
189+
static const BufferDescr kBooleanBuffer(BufferType::DATA, 1);
190+
static const BufferDescr kValues64(BufferType::DATA, 64);
191+
static const BufferDescr kValues32(BufferType::DATA, 32);
192+
static const BufferDescr kValues16(BufferType::DATA, 16);
193+
static const BufferDescr kValues8(BufferType::DATA, 8);
194+
195+
std::vector<BufferDescr> FixedWidthType::GetBufferLayout() const {
196+
return {kValidityBuffer, BufferDescr(BufferType::DATA, bit_width())};
197+
}
198+
199+
std::vector<BufferDescr> NullType::GetBufferLayout() const {
200+
return {};
201+
}
202+
203+
std::vector<BufferDescr> BinaryType::GetBufferLayout() const {
204+
return {kValidityBuffer, kOffsetBuffer, kValues8};
205+
}
206+
207+
std::vector<BufferDescr> ListType::GetBufferLayout() const {
208+
return {kValidityBuffer, kOffsetBuffer};
209+
}
210+
211+
std::vector<BufferDescr> StructType::GetBufferLayout() const {
212+
return {kValidityBuffer, kTypeBuffer};
213+
}
214+
215+
std::vector<BufferDescr> UnionType::GetBufferLayout() const {
216+
if (mode == UnionMode::SPARSE) {
217+
return {kValidityBuffer, kTypeBuffer};
218+
} else {
219+
return {kValidityBuffer, kTypeBuffer, kOffsetBuffer};
220+
}
221+
}
222+
223+
std::vector<BufferDescr> DecimalType::GetBufferLayout() const {
224+
// TODO(wesm)
225+
return {};
226+
}
227+
190228
} // namespace arrow

0 commit comments

Comments
 (0)