Skip to content

Commit

Permalink
[BugFix] Use repeated_ancestor_def_level in parquet writer (StarRocks…
Browse files Browse the repository at this point in the history
…#28884)

This PR introduces `repeated_ancestor_def_level` to determine whether an
undefined value has a slot in the leaf column. More specifically,
- `def_level < repeated_ancestor_def_level` means undefined and not
having a slot
- `repeated_ancestor_def_level <= def_level < max_def_level` means
undefined and having a slot
- `def_level = max_def_level` means defined and (surely) having a slot

Previously, we rely on `def_level = max_def_level` to determine both,
but unfortunately, this is wrong. A simple case would be a null struct,
whose def_level is zero but also have slots in child columns.

Example:
```
mysql> create table ice_struct1(a int,b struct<col_int int, col_string string, col_date date>);
Query OK, 0 rows affected (0.43 sec)

mysql> insert into ice_struct1 values(1,row(0,'ABC',null)),(2,null),(3,row(null,null,'2003-09-13'));
Query OK, 3 rows affected (0.33 sec)
{'label':'FAKE_ICEBERG_SINK_LABEL', 'status':'VISIBLE', 'txnId':'-1'}

mysql> select * from ice_struct1 order by 1;
+------+----------------------------------------------------+
| a    | b                                                  |
+------+----------------------------------------------------+
|    1 | {"col_int":0,"col_string":"ABC","col_date":null}   |
|    2 | NULL                                               |
|    3 | {"col_int":null,"col_string":null,"col_date":null} |
+------+----------------------------------------------------+
3 rows in set (0.04 sec)
```

Fixed with this PR:
<img width="1421" alt="image"
src="https://github.com/StarRocks/starrocks/assets/16740944/df4e9356-2e0e-4336-8f86-ca331323a784">

More complex test cases are tested as well.
  • Loading branch information
letian-jiang authored Aug 11, 2023
1 parent 4a4fe7a commit 2a15678
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 87 deletions.
137 changes: 61 additions & 76 deletions be/src/formats/parquet/level_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ void LevelBuilder::_write_boolean_column_chunk(const LevelBuilderContext& ctx, c

// Use the rep_levels in the context from caller since node is primitive.
auto& rep_levels = ctx._rep_levels;
auto def_levels = _make_def_levels_branchless(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(col->size(), null_col);
auto def_levels = _make_def_levels(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(ctx, null_col, col->size());

// sizeof(bool) depends on implementation, thus we cast values to ensure correctness
auto values = new bool[col->size()];
Expand Down Expand Up @@ -166,8 +166,8 @@ void LevelBuilder::_write_int_column_chunk(const LevelBuilderContext& ctx, const

// Use the rep_levels in the context from caller since node is primitive.
auto& rep_levels = ctx._rep_levels;
auto def_levels = _make_def_levels(ctx, node, null_col);
auto null_bitset = _make_null_bitset(col->size(), null_col);
auto def_levels = _make_def_levels(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(ctx, null_col, col->size());

using source_type = RunTimeCppType<lt>;
using target_type = typename ::parquet::type_traits<pt>::value_type;
Expand Down Expand Up @@ -209,8 +209,8 @@ void LevelBuilder::_write_decimal128_column_chunk(const LevelBuilderContext& ctx

// Use the rep_levels in the context from caller since node is primitive.
auto& rep_levels = ctx._rep_levels;
auto def_levels = _make_def_levels_branchless(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(col->size(), null_col);
auto def_levels = _make_def_levels(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(ctx, null_col, col->size());

auto values = new unsigned __int128[col->size()];
DeferOp defer([&] { delete[] values; });
Expand Down Expand Up @@ -245,8 +245,8 @@ void LevelBuilder::_write_date_column_chunk(const LevelBuilderContext& ctx, cons

// Use the rep_levels in the context from caller since node is primitive.
auto& rep_levels = ctx._rep_levels;
auto def_levels = _make_def_levels_branchless(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(col->size(), null_col);
auto def_levels = _make_def_levels(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(ctx, null_col, col->size());

auto unix_epoch_date = DateValue::create(1970, 1, 1); // base date to subtract

Expand Down Expand Up @@ -274,8 +274,8 @@ void LevelBuilder::_write_datetime_column_chunk(const LevelBuilderContext& ctx,

// Use the rep_levels in the context from caller since node is primitive.
auto rep_levels = ctx._rep_levels;
auto def_levels = _make_def_levels_branchless(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(col->size(), null_col);
auto def_levels = _make_def_levels(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(ctx, null_col, col->size());

auto values = new int64_t[col->size()];
DeferOp defer([&] { delete[] values; });
Expand Down Expand Up @@ -303,8 +303,8 @@ void LevelBuilder::_write_varchar_column_chunk(const LevelBuilderContext& ctx, c

// Use the rep_levels in the context from caller since node is primitive.
auto& rep_levels = ctx._rep_levels;
auto def_levels = _make_def_levels_branchless(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(col->size(), null_col);
auto def_levels = _make_def_levels(ctx, node, null_col, col->size());
auto null_bitset = _make_null_bitset(ctx, null_col, col->size());

auto values = new ::parquet::ByteArray[col->size()];
DeferOp defer([&] { delete[] values; });
Expand Down Expand Up @@ -354,7 +354,7 @@ void LevelBuilder::_write_array_column_chunk(const LevelBuilderContext& ctx, con
auto rep_level = ctx._rep_levels ? (*ctx._rep_levels)[i] : 0;

// already null in parent column
if (def_level < ctx._max_def_level) {
if (def_level < ctx._repeated_ancestor_def_level) {
(*def_levels)[num_levels] = def_level;
(*rep_levels)[num_levels] = rep_level;

Expand All @@ -363,7 +363,7 @@ void LevelBuilder::_write_array_column_chunk(const LevelBuilderContext& ctx, con
}

// null in current array_column
if (null_col != nullptr && null_col[offset]) {
if (def_level < ctx._max_def_level || (null_col != nullptr && null_col[offset])) {
(*def_levels)[num_levels] = def_level;
(*rep_levels)[num_levels] = rep_level;

Expand Down Expand Up @@ -393,8 +393,9 @@ void LevelBuilder::_write_array_column_chunk(const LevelBuilderContext& ctx, con

def_levels->resize(num_levels);
rep_levels->resize(num_levels);
LevelBuilderContext derived_ctx(def_levels->size(), def_levels, ctx._max_def_level + node->is_optional() + 1,
rep_levels, ctx._max_rep_level + 1);
LevelBuilderContext derived_ctx(def_levels->size(), def_levels, rep_levels,
ctx._max_def_level + node->is_optional() + 1, ctx._max_rep_level + 1,
ctx._max_def_level + node->is_optional() + 1);

_write_column_chunk(derived_ctx, type_desc.children[0], inner_node, elements, write_leaf_callback);
}
Expand Down Expand Up @@ -432,15 +433,15 @@ void LevelBuilder::_write_map_column_chunk(const LevelBuilderContext& ctx, const
auto def_level = ctx._def_levels ? (*ctx._def_levels)[i] : 0;
auto rep_level = ctx._rep_levels ? (*ctx._rep_levels)[i] : 0;

if (def_level < ctx._max_def_level) {
if (def_level < ctx._repeated_ancestor_def_level) {
(*def_levels)[num_levels] = def_level;
(*rep_levels)[num_levels] = rep_level;

num_levels++;
continue;
}

if (null_col != nullptr && null_col[offset]) {
if (def_level < ctx._max_def_level || (null_col != nullptr && null_col[offset])) {
(*def_levels)[num_levels] = def_level;
(*rep_levels)[num_levels] = rep_level;

Expand Down Expand Up @@ -468,8 +469,9 @@ void LevelBuilder::_write_map_column_chunk(const LevelBuilderContext& ctx, const

def_levels->resize(num_levels);
rep_levels->resize(num_levels);
LevelBuilderContext derived_ctx(def_levels->size(), def_levels, ctx._max_def_level + node->is_optional() + 1,
rep_levels, ctx._max_rep_level + 1);
LevelBuilderContext derived_ctx(def_levels->size(), def_levels, rep_levels,
ctx._max_def_level + node->is_optional() + 1, ctx._max_rep_level + 1,
ctx._max_def_level + node->is_optional() + 1);

_write_column_chunk(derived_ctx, type_desc.children[0], key_node, keys, write_leaf_callback);
_write_column_chunk(derived_ctx, type_desc.children[1], value_node, values, write_leaf_callback);
Expand All @@ -487,76 +489,58 @@ void LevelBuilder::_write_struct_column_chunk(const LevelBuilderContext& ctx, co

// Use the rep_levels in the context from caller since node is primitive.
auto rep_levels = ctx._rep_levels;
auto def_levels = _make_def_levels_branchless(ctx, node, null_col, col->size());
auto def_levels = _make_def_levels(ctx, node, null_col, col->size());

LevelBuilderContext derived_ctx(def_levels->size(), def_levels, ctx._max_def_level + node->is_optional(),
rep_levels, ctx._max_rep_level);
LevelBuilderContext derived_ctx(def_levels->size(), def_levels, rep_levels,
ctx._max_def_level + node->is_optional(), ctx._max_rep_level,
ctx._repeated_ancestor_def_level);

for (size_t i = 0; i < type_desc.children.size(); i++) {
auto sub_col = struct_col->field_column(type_desc.field_names[i]);
_write_column_chunk(derived_ctx, type_desc.children[i], struct_node->field(i), sub_col, write_leaf_callback);
}
}

// Convert byte-addressable bitset into a bit-addressable bitset.
std::shared_ptr<std::vector<uint8_t>> LevelBuilder::_make_null_bitset(size_t n, const uint8_t* nulls) const {
if (nulls == nullptr) {
return nullptr;
}
// Convert byte-addressable mask into a bit-addressable mask. Note the 0/1 values are flipped.
std::shared_ptr<std::vector<uint8_t>> LevelBuilder::_make_null_bitset(const LevelBuilderContext& ctx,
const uint8_t* nulls,
const size_t col_size) const {
if (ctx._repeated_ancestor_def_level == ctx._max_def_level) {
if (nulls == nullptr) {
return nullptr;
}

auto bitset = std::make_shared<std::vector<uint8_t>>((n + 7) / 8);
for (size_t i = 0; i < n; i++) {
(*bitset)[i / 8] |= (1 - nulls[i]) << (i % 8);
auto bitset = std::make_shared<std::vector<uint8_t>>((col_size + 7) / 8);
for (size_t i = 0; i < col_size; i++) {
(*bitset)[i / 8] |= (1 - nulls[i]) << (i % 8);
}
return bitset;
}

// slow path
auto bitset = std::make_shared<std::vector<uint8_t>>((col_size + 7) / 8);
size_t col_offset = 0;
for (size_t i = 0; i < ctx._num_levels; i++) {
int16_t level = ctx._def_levels ? (*ctx._def_levels)[i] : 0;
if (level < ctx._repeated_ancestor_def_level) {
continue;
}
uint8_t is_null = nulls != nullptr ? nulls[col_offset] : 0;
is_null |= (level < ctx._max_def_level); // undefined but having a slot in leaf column
(*bitset)[col_offset / 8] |= (1 - is_null) << (col_offset % 8);
col_offset++;
}
DCHECK(col_size == col_offset);
return bitset;
}

// Make definition levels in terms of repetition and nullity.
// node could be primitive, or group node denoting struct.
std::shared_ptr<std::vector<int16_t>> LevelBuilder::_make_def_levels(const LevelBuilderContext& ctx,
const ::parquet::schema::NodePtr& node,
const uint8_t* nulls) const {
if (node->is_required()) {
// For required node, use the def_levels in the context from caller.
return ctx._def_levels;
}

if (ctx._max_def_level == 0) {
auto def_levels = std::make_shared<std::vector<int16_t>>(ctx._num_levels, 1); // assume not-null first
if (nulls == nullptr) { // column has no null
return def_levels;
}

DCHECK(ctx._max_rep_level == 0);
for (size_t i = 0; i < ctx._num_levels; i++) { // nulls.size() == ctx._num_levels
// decrement def_levels for null entries
(*def_levels)[i] -= nulls[i];
}

return def_levels;
}

DCHECK(ctx._def_levels != nullptr);
auto def_levels = std::make_shared<std::vector<int16_t>>(*ctx._def_levels);

int offset = 0;
for (auto& level : *def_levels) {
if (level == ctx._max_def_level) {
if (nulls == nullptr || nulls[offset] == 0) {
// increment def_level for non-null entry
level++;
}
offset++;
}
}
return def_levels;
}

std::shared_ptr<std::vector<int16_t>> LevelBuilder::_make_def_levels_branchless(const LevelBuilderContext& ctx,
const ::parquet::schema::NodePtr& node,
const uint8_t* nulls,
const size_t col_size) const {
const uint8_t* nulls,
const size_t col_size) const {
DCHECK(!node->is_repeated());
if (node->is_required()) {
// For required node, use the def_levels in the context from caller.
return ctx._def_levels;
Expand Down Expand Up @@ -585,22 +569,23 @@ std::shared_ptr<std::vector<int16_t>> LevelBuilder::_make_def_levels_branchless(
if (nulls != nullptr) {
while (level_offset < ctx._num_levels && col_offset < col_size) {
auto& level = (*def_levels)[level_offset];
uint8_t defined = level == ctx._max_def_level;
uint8_t defined = (level == ctx._max_def_level);
uint8_t not_null = defined & (1 - nulls[col_offset]);
level += not_null;
col_offset += defined;
col_offset += (level >= ctx._repeated_ancestor_def_level);
level_offset++;
}
} else {
while (level_offset < ctx._num_levels && col_offset < col_size) {
auto& level = (*def_levels)[level_offset];
uint8_t defined = level == ctx._max_def_level;
uint8_t defined = (level == ctx._max_def_level);
uint8_t not_null = defined;
level += not_null;
col_offset += defined;
col_offset += (level >= ctx._repeated_ancestor_def_level);
level_offset++;
}
}
DCHECK(col_offset == col_size);

return def_levels;
}
Expand Down
25 changes: 14 additions & 11 deletions be/src/formats/parquet/level_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,34 @@ namespace starrocks::parquet {

/// Intermediate data passed between add_column_chunk functions.
/// Example(Int Array): ctx -> writeArrayColumnChunk -> ctx' -> writeIntColumnChunk
/// Immutable and thread-safe.
/// Immutable after construction.
class LevelBuilderContext {
public:
LevelBuilderContext(size_t num_levels, std::shared_ptr<std::vector<int16_t>> def_levels = nullptr,
int16_t max_def_level = 0, std::shared_ptr<std::vector<int16_t>> rep_levels = nullptr,
int16_t max_rep_level = 0)
std::shared_ptr<std::vector<int16_t>> rep_levels = nullptr, int16_t max_def_level = 0,
int16_t max_rep_level = 0, int16_t repeated_ancestor_def_level = 0)
: _max_def_level(max_def_level),
_max_rep_level(max_rep_level),
_repeated_ancestor_def_level(repeated_ancestor_def_level),
_num_levels(num_levels),
_def_levels(std::move(def_levels)),
_rep_levels(std::move(rep_levels)) {
DCHECK(_max_def_level == 0 || _def_levels != nullptr);
DCHECK(_max_rep_level == 0 || _rep_levels != nullptr);
DCHECK(_max_def_level == 0 || _num_levels == _def_levels->size());
DCHECK(_max_rep_level == 0 || _num_levels == _rep_levels->size());
DCHECK(_max_def_level >= _repeated_ancestor_def_level);
DCHECK(_max_def_level >= _max_rep_level);
}

public:
const int16_t _max_def_level;
const int16_t _max_rep_level;

// def level of the closest repeated ancestor
// Note: if the node itself is repeated, then repeated_ancestor_def_level == max_def_level
const int16_t _repeated_ancestor_def_level;

// count of def/rep levels.
// May be larger than values count if there are any undefined values.
const int64_t _num_levels;
Expand Down Expand Up @@ -134,16 +141,12 @@ class LevelBuilder {
const ::parquet::schema::NodePtr& node, const ColumnPtr& col,
const CallbackFunction& write_leaf_callback);

std::shared_ptr<std::vector<uint8_t>> _make_null_bitset(size_t n, const uint8_t* nulls) const;
std::shared_ptr<std::vector<uint8_t>> _make_null_bitset(const LevelBuilderContext& ctx, const uint8_t* nulls,
const size_t col_size) const;

std::shared_ptr<std::vector<int16_t>> _make_def_levels(const LevelBuilderContext& ctx,
const ::parquet::schema::NodePtr& node,
const uint8_t* nulls) const;

std::shared_ptr<std::vector<int16_t>> _make_def_levels_branchless(const LevelBuilderContext& ctx,
const ::parquet::schema::NodePtr& node,
const uint8_t* nulls,
const size_t col_size) const;
const ::parquet::schema::NodePtr& node, const uint8_t* nulls,
const size_t col_size) const;

private:
TypeDescriptor _type_desc;
Expand Down

0 comments on commit 2a15678

Please sign in to comment.