Skip to content

Commit a251714

Browse files
pitrouxhochy
authored andcommitted
PARQUET-1268: Fix conversion of null list Arrow arrays
Author: Antoine Pitrou <antoine@python.org> Closes apache#454 from pitrou/PARQUET-1268-null-list-conversion and squashes the following commits: e4e7744 [Antoine Pitrou] PARQUET-1268: Fix conversion of null list Arrow arrays Change-Id: Ie43bbdd282cd691e641412782f11e1dc651e730d
1 parent 828783d commit a251714

File tree

3 files changed

+66
-32
lines changed

3 files changed

+66
-32
lines changed

cpp/src/parquet/arrow/arrow-reader-writer-test.cc

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,23 +1000,56 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
10001000
using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
10011001

10021002
TEST_F(TestNullParquetIO, NullColumn) {
1003-
std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(SMALL_SIZE);
1004-
std::shared_ptr<Table> table = MakeSimpleTable(values, true);
1005-
this->sink_ = std::make_shared<InMemoryOutputStream>();
1006-
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
1007-
values->length(), default_writer_properties()));
1003+
for (int32_t num_rows : {0, SMALL_SIZE}) {
1004+
std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(num_rows);
1005+
std::shared_ptr<Table> table = MakeSimpleTable(values, true /* nullable */);
1006+
this->sink_ = std::make_shared<InMemoryOutputStream>();
10081007

1009-
std::shared_ptr<Table> out;
1010-
std::unique_ptr<FileReader> reader;
1011-
this->ReaderFromSink(&reader);
1012-
this->ReadTableFromFile(std::move(reader), &out);
1013-
ASSERT_EQ(1, out->num_columns());
1014-
ASSERT_EQ(100, out->num_rows());
1008+
const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows());
1009+
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
1010+
chunk_size, default_writer_properties()));
10151011

1016-
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
1017-
ASSERT_EQ(1, chunked_array->num_chunks());
1012+
std::shared_ptr<Table> out;
1013+
std::unique_ptr<FileReader> reader;
1014+
this->ReaderFromSink(&reader);
1015+
this->ReadTableFromFile(std::move(reader), &out);
1016+
ASSERT_EQ(1, out->num_columns());
1017+
ASSERT_EQ(num_rows, out->num_rows());
10181018

1019-
internal::AssertArraysEqual(*values, *chunked_array->chunk(0));
1019+
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
1020+
ASSERT_EQ(1, chunked_array->num_chunks());
1021+
internal::AssertArraysEqual(*values, *chunked_array->chunk(0));
1022+
}
1023+
}
1024+
1025+
TEST_F(TestNullParquetIO, NullListColumn) {
1026+
std::vector<int32_t> offsets1 = {0};
1027+
std::vector<int32_t> offsets2 = {0, 2, 2, 3, 115};
1028+
for (std::vector<int32_t> offsets : {offsets1, offsets2}) {
1029+
std::shared_ptr<Array> offsets_array, values_array, list_array;
1030+
::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offsets, &offsets_array);
1031+
values_array = std::make_shared<::arrow::NullArray>(offsets.back());
1032+
ASSERT_OK(::arrow::ListArray::FromArrays(*offsets_array, *values_array,
1033+
default_memory_pool(), &list_array));
1034+
1035+
std::shared_ptr<Table> table = MakeSimpleTable(list_array, false /* nullable */);
1036+
this->sink_ = std::make_shared<InMemoryOutputStream>();
1037+
1038+
const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows());
1039+
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
1040+
chunk_size, default_writer_properties()));
1041+
1042+
std::shared_ptr<Table> out;
1043+
std::unique_ptr<FileReader> reader;
1044+
this->ReaderFromSink(&reader);
1045+
this->ReadTableFromFile(std::move(reader), &out);
1046+
ASSERT_EQ(1, out->num_columns());
1047+
ASSERT_EQ(offsets.size() - 1, out->num_rows());
1048+
1049+
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
1050+
ASSERT_EQ(1, chunked_array->num_chunks());
1051+
internal::AssertArraysEqual(*list_array, *chunked_array->chunk(0));
1052+
}
10201053
}
10211054

10221055
TEST_F(TestNullParquetIO, NullDictionaryColumn) {

cpp/src/parquet/arrow/reader.cc

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,17 +1235,6 @@ struct TransferFunctor<::arrow::Decimal128Type, Int64Type> {
12351235
} break;
12361236

12371237
Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
1238-
if (!record_reader_->HasMoreData()) {
1239-
// Exhausted all row groups.
1240-
*out = nullptr;
1241-
return Status::OK();
1242-
}
1243-
1244-
if (field_->type()->id() == ::arrow::Type::NA) {
1245-
*out = std::make_shared<::arrow::NullArray>(records_to_read);
1246-
return Status::OK();
1247-
}
1248-
12491238
try {
12501239
// Pre-allocation gives much better performance for flat columns
12511240
record_reader_->Reserve(records_to_read);
@@ -1282,6 +1271,11 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>*
12821271
TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
12831272
TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
12841273
TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
1274+
case ::arrow::Type::NA: {
1275+
*out = std::make_shared<::arrow::NullArray>(record_reader_->values_written());
1276+
RETURN_NOT_OK(WrapIntoListArray<Int32Type>(out));
1277+
break;
1278+
}
12851279
case ::arrow::Type::DECIMAL: {
12861280
switch (descr_->physical_type()) {
12871281
case ::parquet::Type::INT32: {

cpp/src/parquet/arrow/writer.cc

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,9 @@ class LevelBuilder {
193193
}
194194

195195
Status HandleNonNullList(int16_t def_level, int16_t rep_level, int64_t index) {
196-
int32_t inner_offset = offsets_[rep_level][index];
197-
int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
198-
int64_t recursion_level = rep_level + 1;
196+
const int32_t inner_offset = offsets_[rep_level][index];
197+
const int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
198+
const int64_t recursion_level = rep_level + 1;
199199
if (inner_length == 0) {
200200
return def_levels_.Append(def_level);
201201
}
@@ -205,14 +205,21 @@ class LevelBuilder {
205205
inner_length);
206206
} else {
207207
// We have reached the leaf: primitive list, handle remaining nullables
208+
const bool nullable_level = nullable_[recursion_level];
209+
const int64_t level_null_count = null_counts_[recursion_level];
210+
const uint8_t* level_valid_bitmap = valid_bitmaps_[recursion_level];
211+
208212
for (int64_t i = 0; i < inner_length; i++) {
209213
if (i > 0) {
210214
RETURN_NOT_OK(rep_levels_.Append(static_cast<int16_t>(rep_level + 1)));
211215
}
212-
if (nullable_[recursion_level] &&
213-
((null_counts_[recursion_level] == 0) ||
214-
BitUtil::GetBit(valid_bitmaps_[recursion_level],
215-
inner_offset + i + array_offsets_[recursion_level]))) {
216+
if (level_null_count && level_valid_bitmap == nullptr) {
217+
// Special case: this is a null array (all elements are null)
218+
RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1)));
219+
} else if (nullable_level && ((level_null_count == 0) ||
220+
BitUtil::GetBit(level_valid_bitmap,
221+
inner_offset + i + array_offsets_[recursion_level]))) {
222+
// Non-null element in a null level
216223
RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 2)));
217224
} else {
218225
// This can be produced in two case:

0 commit comments

Comments
 (0)