Skip to content

Commit

Permalink
Merge pull request StarRocks#28778 from fzhedu/groupConcat
Browse files Browse the repository at this point in the history
[Feature] Group concat support order by and distinct
  • Loading branch information
fzhedu authored Aug 24, 2023
2 parents 1b32651 + 1bc6b03 commit 34b655c
Show file tree
Hide file tree
Showing 32 changed files with 2,944 additions and 116 deletions.
12 changes: 9 additions & 3 deletions be/src/exec/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,16 @@ void AggregatorParams::init() {

bool is_input_nullable = has_outer_join_child || desc.nodes[0].has_nullable_child;
agg_fn_types[i] = {return_type, serde_type, arg_typedescs, is_input_nullable, desc.nodes[0].is_nullable};
if (fn.name.function_name == "array_agg") {
if (fn.name.function_name == "array_agg" || fn.name.function_name == "group_concat") {
// set order by info
if (fn.aggregate_fn.__isset.is_asc_order && fn.aggregate_fn.__isset.nulls_first &&
!fn.aggregate_fn.is_asc_order.empty()) {
agg_fn_types[i].is_asc_order = fn.aggregate_fn.is_asc_order;
agg_fn_types[i].nulls_first = fn.aggregate_fn.nulls_first;
}
if (fn.aggregate_fn.__isset.is_distinct) {
agg_fn_types[i].is_distinct = fn.aggregate_fn.is_distinct;
}
}
}
}
Expand Down Expand Up @@ -459,7 +462,11 @@ Status Aggregator::prepare(RuntimeState* state, ObjectPool* pool, RuntimeProfile
for (int i = 0; i < _agg_fn_ctxs.size(); ++i) {
_agg_fn_ctxs[i] = FunctionContext::create_context(
state, _mem_pool.get(), AnyValUtil::column_type_to_type_desc(_agg_fn_types[i].result_type),
_agg_fn_types[i].arg_typedescs, _agg_fn_types[i].is_asc_order, _agg_fn_types[i].nulls_first);
_agg_fn_types[i].arg_typedescs, _agg_fn_types[i].is_distinct, _agg_fn_types[i].is_asc_order,
_agg_fn_types[i].nulls_first);
if (state->query_options().__isset.group_concat_max_len) {
_agg_fn_ctxs[i]->set_group_concat_max_len(state->query_options().group_concat_max_len);
}
state->obj_pool()->add(_agg_fn_ctxs[i]);
}

Expand Down Expand Up @@ -868,7 +875,6 @@ Status Aggregator::output_chunk_by_streaming(Chunk* input_chunk, ChunkPtr* chunk
DCHECK(!_group_by_columns.empty());

RETURN_IF_ERROR(evaluate_agg_fn_exprs(input_chunk));

const auto num_rows = _group_by_columns[0]->size();
Columns agg_result_column = _create_agg_result_columns(num_rows, true);
for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ struct AggFunctionTypes {
// hold order-by info
std::vector<bool> is_asc_order;
std::vector<bool> nulls_first;

bool is_distinct = false;
};

struct ColumnType {
Expand Down
6 changes: 6 additions & 0 deletions be/src/exprs/agg/factory/aggregate_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ static const AggregateFunction* get_function(const std::string& name, LogicalTyp
}
}

if (func_version > 6) {
if (name == "group_concat") {
func_name = "group_concat2";
}
}

if (binary_type == TFunctionBinaryType::BUILTIN) {
auto func = AggregateFuncResolver::instance()->get_aggregate_info(func_name, arg_type, return_type,
is_window_function, is_null);
Expand Down
4 changes: 4 additions & 0 deletions be/src/exprs/agg/factory/aggregate_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class AggregateFactory {
return std::make_shared<ArrayAggAggregateFunctionV2>();
}

static AggregateFunctionPtr MakeGroupConcatAggregateFunctionV2() {
return std::make_shared<GroupConcatAggregateFunctionV2>();
}

template <LogicalType LT>
static auto MakeMaxAggregateFunction();

Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/agg/factory/aggregate_resolver_others.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ void AggregateFuncResolver::register_others() {

add_general_mapping<AnyValueSemiState>("any_value", false, AggregateFactory::MakeAnyValueSemiAggregateFunction());
add_general_mapping_notnull("array_agg2", false, AggregateFactory::MakeArrayAggAggregateFunctionV2());
add_general_mapping_notnull("group_concat2", false, AggregateFactory::MakeGroupConcatAggregateFunctionV2());
}

} // namespace starrocks
Loading

0 comments on commit 34b655c

Please sign in to comment.