Skip to content

Commit

Permalink
[fix](grouping sets) fix grouping sets have multiple empty sets (apac…
Browse files Browse the repository at this point in the history
…he#32317)

in this apache#32112, handling empty sets (empty expression cases) has been addressed. However, multiple empty sets in grouping sets have different grouping IDs
  • Loading branch information
Mryange authored Mar 18, 2024
1 parent 0d55c6e commit ccb122c
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 18 deletions.
27 changes: 21 additions & 6 deletions be/src/pipeline/exec/repeat_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ Status RepeatLocalState::get_repeated_block(vectorized::Block* child_block, int

const auto rows = child_block->rows();
// Fill grouping ID to block
RETURN_IF_ERROR(add_grouping_id_column(rows, cur_col, columns, repeat_id_idx));

DCHECK_EQ(cur_col, column_size);

return Status::OK();
}

Status RepeatLocalState::add_grouping_id_column(std::size_t rows, std::size_t& cur_col,
vectorized::MutableColumns& columns,
int repeat_id_idx) {
auto& p = _parent->cast<RepeatOperatorX>();
for (auto slot_idx = 0; slot_idx < p._grouping_list.size(); slot_idx++) {
DCHECK_LT(slot_idx, p._output_tuple_desc->slots().size());
const SlotDescriptor* _virtual_slot_desc = p._output_tuple_desc->slots()[cur_col];
Expand All @@ -166,14 +177,10 @@ Status RepeatLocalState::get_repeated_block(vectorized::Block* child_block, int
int64_t val = p._grouping_list[slot_idx][repeat_id_idx];
auto* column_ptr = columns[cur_col].get();
DCHECK(!p._output_slots[cur_col]->is_nullable());

auto* col = assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(column_ptr);
col->insert_raw_integers(val, rows);
cur_col++;
}

DCHECK_EQ(cur_col, column_size);

return Status::OK();
}

Expand Down Expand Up @@ -228,8 +235,16 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp
_repeat_id_idx = 0;
}
} else if (local_state._expr_ctxs.empty()) {
DCHECK(!_intermediate_block || (_intermediate_block && _intermediate_block->rows() == 0));
output_block->swap(_child_block);
auto m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
_output_slots);
auto rows = _child_block.rows();
auto& columns = m_block.mutable_columns();

for (int repeat_id_idx = 0; repeat_id_idx < _repeat_id_list.size(); repeat_id_idx++) {
std::size_t cur_col = 0;
RETURN_IF_ERROR(
local_state.add_grouping_id_column(rows, cur_col, columns, repeat_id_idx));
}
_child_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
}
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block,
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/repeat_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class RepeatLocalState final : public PipelineXLocalState<FakeSharedState> {
Status get_repeated_block(vectorized::Block* child_block, int repeat_id_idx,
vectorized::Block* output_block);

Status add_grouping_id_column(std::size_t rows, std::size_t& cur_col,
vectorized::MutableColumns& columns, int repeat_id_idx);

private:
friend class RepeatOperatorX;
template <typename LocalStateType>
Expand Down Expand Up @@ -97,7 +100,7 @@ class RepeatOperatorX final : public StatefulOperatorX<RepeatLocalState> {
TupleId _output_tuple_id;
const TupleDescriptor* _output_tuple_desc = nullptr;

std::vector<SlotDescriptor*> _output_slots;
mutable std::vector<SlotDescriptor*> _output_slots;

vectorized::VExprContextSPtrs _expr_ctxs;
};
Expand Down
23 changes: 18 additions & 5 deletions be/src/vec/exec/vrepeat_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,15 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl

const auto rows = child_block->rows();
// Fill grouping ID to block
RETURN_IF_ERROR(add_grouping_id_column(rows, cur_col, columns, repeat_id_idx));
output_block->set_columns(std::move(columns));
DCHECK_EQ(cur_col, column_size);

return Status::OK();
}

Status VRepeatNode::add_grouping_id_column(std::size_t rows, std::size_t& cur_col,
vectorized::MutableColumns& columns, int repeat_id_idx) {
for (auto slot_idx = 0; slot_idx < _grouping_list.size(); slot_idx++) {
DCHECK_LT(slot_idx, _output_tuple_desc->slots().size());
const SlotDescriptor* _virtual_slot_desc = _output_tuple_desc->slots()[cur_col];
Expand All @@ -166,9 +175,6 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl
col->insert_raw_integers(val, rows);
cur_col++;
}
output_block->set_columns(std::move(columns));
DCHECK_EQ(cur_col, column_size);

return Status::OK();
}

Expand All @@ -194,8 +200,15 @@ Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_b
_repeat_id_idx = 0;
}
} else if (_expr_ctxs.empty()) {
DCHECK(!_intermediate_block || (_intermediate_block && _intermediate_block->rows() == 0));
output_block->swap(*_child_block);
auto m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
_output_slots);
auto rows = _child_block->rows();
auto& columns = m_block.mutable_columns();

for (int repeat_id_idx = 0; repeat_id_idx < _repeat_id_list.size(); repeat_id_idx++) {
std::size_t cur_col = 0;
RETURN_IF_ERROR(add_grouping_id_column(rows, cur_col, columns, repeat_id_idx));
}
release_block_memory(*_child_block);
}
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns()));
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/vrepeat_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class VRepeatNode : public ExecNode {
private:
Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* output_block);

Status add_grouping_id_column(std::size_t rows, std::size_t& cur_col,
vectorized::MutableColumns& columns, int repeat_id_idx);

// Slot id set used to indicate those slots need to set to null.
std::vector<std::set<SlotId>> _slot_id_set_list;
// all slot id
Expand Down
32 changes: 29 additions & 3 deletions regression-test/data/correctness_p0/test_grouping_sets_empty.out
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select1 --
1
3

-- !select2 --
1
3

-- !select3 --
1
3
3
3

-- !select4 --
\N
\N
\N
1
2
3

-- !select5 --
3

-- !select6 --
3

-- !select7 --
3
3
3

-- !select8 --
\N
\N
\N
1
2
3

Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

suite("test_grouping_sets_empty") {

sql"""
drop table if exists test_grouping_sets_empty;
"""

sql"""
create table test_grouping_sets_empty (a int) distributed by hash(a) buckets 1 properties ( 'replication_num' = '1');
"""

sql """
insert into test_grouping_sets_empty values (1);
insert into test_grouping_sets_empty values (1),(2),(3);
"""


Expand All @@ -39,21 +43,37 @@ suite("test_grouping_sets_empty") {
select count(*) from test_grouping_sets_empty group by grouping sets (());
"""

qt_select3 """
select count(*) from test_grouping_sets_empty group by grouping sets ((),(),());
"""

qt_select4 """
select a from test_grouping_sets_empty group by grouping sets ((),(),(),(a)) order by a;
"""


sql """
set experimental_enable_pipeline_x_engine=false;
"""


qt_select3 """
qt_select5 """
select count(a) from test_grouping_sets_empty group by grouping sets (());
"""


qt_select4 """
qt_select6 """
select count(*) from test_grouping_sets_empty group by grouping sets (());
"""

qt_select7 """
select count(*) from test_grouping_sets_empty group by grouping sets ((),(),());
"""

qt_select8 """
select a from test_grouping_sets_empty group by grouping sets ((),(),(),(a)) order by a;
"""



}

0 comments on commit ccb122c

Please sign in to comment.