Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/.clang-tidy-ignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
ipc-adapter-test.cc
memory-pool-test.cc
24 changes: 22 additions & 2 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/types/list.h"
#include "arrow/types/primitive.h"
#include "arrow/types/string.h"
#include "arrow/types/struct.h"
#include "arrow/util/buffer.h"
#include "arrow/util/logging.h"
#include "arrow/util/status.h"
Expand Down Expand Up @@ -118,8 +119,11 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes
RETURN_NOT_OK(VisitArray(
list_arr->values().get(), field_nodes, buffers, max_recursion_depth - 1));
} else if (arr->type_enum() == Type::STRUCT) {
// TODO(wesm)
return Status::NotImplemented("Struct type");
const auto struct_arr = static_cast<const StructArray*>(arr);
for (auto& field : struct_arr->fields()) {
RETURN_NOT_OK(
VisitArray(field.get(), field_nodes, buffers, max_recursion_depth - 1));
}
} else {
return Status::NotImplemented("Unrecognized type");
}
Expand Down Expand Up @@ -313,6 +317,22 @@ class RowBatchReader::Impl {
return MakeListArray(type, field_meta.length, offsets, values_array,
field_meta.null_count, null_bitmap, out);
}

if (type->type == Type::STRUCT) {
const int num_children = type->num_children();
std::vector<ArrayPtr> fields;
fields.reserve(num_children);
for (int child_idx = 0; child_idx < num_children; ++child_idx) {
std::shared_ptr<Array> field_array;
RETURN_NOT_OK(NextArray(
type->child(child_idx).get(), max_recursion_depth - 1, &field_array));
fields.push_back(field_array);
}
out->reset(new StructArray(
type, field_meta.length, fields, field_meta.null_count, null_bitmap));
return Status::OK();
}

return Status::NotImplemented("Non-primitive types not complete yet");
}

Expand Down
46 changes: 39 additions & 7 deletions cpp/src/arrow/ipc/ipc-adapter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "arrow/types/list.h"
#include "arrow/types/primitive.h"
#include "arrow/types/string.h"
#include "arrow/types/struct.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/buffer.h"
#include "arrow/util/memory-pool.h"
Expand Down Expand Up @@ -205,15 +206,16 @@ Status MakeNonNullRowBatch(std::shared_ptr<RowBatch>* out) {

// Example data
MemoryPool* pool = default_memory_pool();
const int length = 200;
const int length = 50;
std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;

RETURN_NOT_OK(MakeRandomInt32Array(1000, true, pool, &leaf_values));
bool include_nulls = false;
RETURN_NOT_OK(MakeRandomListArray(leaf_values, 50, include_nulls, pool, &list_array));
RETURN_NOT_OK(
MakeRandomListArray(list_array, 50, include_nulls, pool, &list_list_array));
RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
MakeRandomListArray(leaf_values, length, include_nulls, pool, &list_array));
RETURN_NOT_OK(
MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array));
RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array));
out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array}));
return Status::OK();
}
Expand All @@ -238,10 +240,40 @@ Status MakeDeeplyNestedList(std::shared_ptr<RowBatch>* out) {
return Status::OK();
}

INSTANTIATE_TEST_CASE_P(
RoundTripTests, TestWriteRowBatch,
Status MakeStruct(std::shared_ptr<RowBatch>* out) {
// reuse constructed list columns
std::shared_ptr<RowBatch> list_batch;
RETURN_NOT_OK(MakeListRowBatch(&list_batch));
std::vector<ArrayPtr> columns = {
list_batch->column(0), list_batch->column(1), list_batch->column(2)};
auto list_schema = list_batch->schema();

// Define schema
std::shared_ptr<DataType> type(new StructType(
{list_schema->field(0), list_schema->field(1), list_schema->field(2)}));
auto f0 = std::make_shared<Field>("non_null_struct", type);
auto f1 = std::make_shared<Field>("null_struct", type);
std::shared_ptr<Schema> schema(new Schema({f0, f1}));

// construct individual nullable/non-nullable struct arrays
ArrayPtr no_nulls(new StructArray(type, list_batch->num_rows(), columns));
std::vector<uint8_t> null_bytes(list_batch->num_rows(), 1);
null_bytes[0] = 0;
std::shared_ptr<Buffer> null_bitmask;
RETURN_NOT_OK(util::bytes_to_bits(null_bytes, &null_bitmask));
ArrayPtr with_nulls(
new StructArray(type, list_batch->num_rows(), columns, 1, null_bitmask));

// construct batch
std::vector<ArrayPtr> arrays = {no_nulls, with_nulls};
out->reset(new RowBatch(schema, list_batch->num_rows(), arrays));
return Status::OK();
}

INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,
::testing::Values(&MakeIntRowBatch, &MakeListRowBatch, &MakeNonNullRowBatch,
&MakeZeroLengthRowBatch, &MakeDeeplyNestedList, &MakeStringTypesRowBatch));
&MakeZeroLengthRowBatch, &MakeDeeplyNestedList,
&MakeStringTypesRowBatch, &MakeStruct));

void TestGetRowBatchSize(std::shared_ptr<RowBatch> batch) {
MockMemorySource mock_source(1 << 16);
Expand Down
10 changes: 3 additions & 7 deletions cpp/src/arrow/ipc/metadata-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,8 @@ Status MessageBuilder::SetSchema(const Schema* schema) {
field_offsets.push_back(offset);
}

header_ = flatbuf::CreateSchema(
fbb_,
endianness(),
fbb_.CreateVector(field_offsets))
.Union();
header_ =
flatbuf::CreateSchema(fbb_, endianness(), fbb_.CreateVector(field_offsets)).Union();
body_length_ = 0;
return Status::OK();
}
Expand All @@ -278,8 +275,7 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
const std::vector<flatbuf::FieldNode>& nodes,
const std::vector<flatbuf::Buffer>& buffers) {
header_type_ = flatbuf::MessageHeader_RecordBatch;
header_ = flatbuf::CreateRecordBatch(fbb_, length,
fbb_.CreateVectorOfStructs(nodes),
header_ = flatbuf::CreateRecordBatch(fbb_, length, fbb_.CreateVectorOfStructs(nodes),
fbb_.CreateVectorOfStructs(buffers))
.Union();
body_length_ = body_length;
Expand Down
97 changes: 49 additions & 48 deletions cpp/src/arrow/types/primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,54 +53,55 @@ class ARROW_EXPORT PrimitiveArray : public Array {
const uint8_t* raw_data_;
};

#define NUMERIC_ARRAY_DECL(NAME, TypeClass, T) \
class ARROW_EXPORT NAME : public PrimitiveArray { \
public: \
using value_type = T; \
\
NAME(int32_t length, const std::shared_ptr<Buffer>& data, int32_t null_count = 0, \
const std::shared_ptr<Buffer>& null_bitmap = nullptr) \
: PrimitiveArray( \
std::make_shared<TypeClass>(), length, data, null_count, null_bitmap) {} \
NAME(const TypePtr& type, int32_t length, const std::shared_ptr<Buffer>& data, \
int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr) \
: PrimitiveArray(type, length, data, null_count, null_bitmap) {} \
\
bool EqualsExact(const NAME& other) const { \
return PrimitiveArray::EqualsExact(*static_cast<const PrimitiveArray*>(&other)); \
} \
\
bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, \
const ArrayPtr& arr) const override { \
if (this == arr.get()) { return true; } \
if (!arr) { return false; } \
if (this->type_enum() != arr->type_enum()) { return false; } \
const auto other = static_cast<NAME*>(arr.get()); \
for (int32_t i = start_idx, o_i = other_start_idx; i < end_idx; ++i, ++o_i) { \
const bool is_null = IsNull(i); \
if (is_null != arr->IsNull(o_i) || \
(!is_null && Value(i) != other->Value(o_i))) { \
return false; \
} \
} \
return true; \
} \
\
const T* raw_data() const { return reinterpret_cast<const T*>(raw_data_); } \
\
T Value(int i) const { return raw_data()[i]; } \
};

NUMERIC_ARRAY_DECL(UInt8Array, UInt8Type, uint8_t);
NUMERIC_ARRAY_DECL(Int8Array, Int8Type, int8_t);
NUMERIC_ARRAY_DECL(UInt16Array, UInt16Type, uint16_t);
NUMERIC_ARRAY_DECL(Int16Array, Int16Type, int16_t);
NUMERIC_ARRAY_DECL(UInt32Array, UInt32Type, uint32_t);
NUMERIC_ARRAY_DECL(Int32Array, Int32Type, int32_t);
NUMERIC_ARRAY_DECL(UInt64Array, UInt64Type, uint64_t);
NUMERIC_ARRAY_DECL(Int64Array, Int64Type, int64_t);
NUMERIC_ARRAY_DECL(FloatArray, FloatType, float);
NUMERIC_ARRAY_DECL(DoubleArray, DoubleType, double);
template <class TypeClass>
class ARROW_EXPORT NumericArray : public PrimitiveArray {
public:
using value_type = typename TypeClass::c_type;
NumericArray(int32_t length, const std::shared_ptr<Buffer>& data,
int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr)
: PrimitiveArray(
std::make_shared<TypeClass>(), length, data, null_count, null_bitmap) {}
NumericArray(const TypePtr& type, int32_t length, const std::shared_ptr<Buffer>& data,
int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr)
: PrimitiveArray(type, length, data, null_count, null_bitmap) {}

bool EqualsExact(const NumericArray<TypeClass>& other) const {
return PrimitiveArray::EqualsExact(*static_cast<const PrimitiveArray*>(&other));
}

bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx,
const ArrayPtr& arr) const override {
if (this == arr.get()) { return true; }
if (!arr) { return false; }
if (this->type_enum() != arr->type_enum()) { return false; }
const auto other = static_cast<NumericArray<TypeClass>*>(arr.get());
for (int32_t i = start_idx, o_i = other_start_idx; i < end_idx; ++i, ++o_i) {
const bool is_null = IsNull(i);
if (is_null != arr->IsNull(o_i) || (!is_null && Value(i) != other->Value(o_i))) {
return false;
}
}
return true;
}
const value_type* raw_data() const {
return reinterpret_cast<const value_type*>(raw_data_);
}

value_type Value(int i) const { return raw_data()[i]; }
};

#define NUMERIC_ARRAY_DECL(NAME, TypeClass) using NAME = NumericArray<TypeClass>;

NUMERIC_ARRAY_DECL(UInt8Array, UInt8Type);
NUMERIC_ARRAY_DECL(Int8Array, Int8Type);
NUMERIC_ARRAY_DECL(UInt16Array, UInt16Type);
NUMERIC_ARRAY_DECL(Int16Array, Int16Type);
NUMERIC_ARRAY_DECL(UInt32Array, UInt32Type);
NUMERIC_ARRAY_DECL(Int32Array, Int32Type);
NUMERIC_ARRAY_DECL(UInt64Array, UInt64Type);
NUMERIC_ARRAY_DECL(Int64Array, Int64Type);
NUMERIC_ARRAY_DECL(FloatArray, FloatType);
NUMERIC_ARRAY_DECL(DoubleArray, DoubleType);

template <typename Type>
class ARROW_EXPORT PrimitiveBuilder : public ArrayBuilder {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/memory-pool-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) {

#ifndef NDEBUG
EXPECT_EXIT(pool->Free(data, 120), ::testing::ExitedWithCode(1),
".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)");
".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)");
#endif

pool->Free(data, 100);
Expand Down