Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ydb/core/kqp/opt/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2471,18 +2471,18 @@ TStatus AnnotateOpAggregate(const TExprNode::TPtr& input, TExprContext& ctx) {
const auto* inputType = input->ChildPtr(TKqpOpAggregate::idx_Input)->GetTypeAnn();
const auto* structType = inputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();

THashMap<TStringBuf, std::pair<TStringBuf, const TTypeAnnotationNode*>> aggTraitsMap;
THashMap<TString, std::pair<TString, const TTypeAnnotationNode*>> aggTraitsMap;
for (const auto& item : TExprBase(input->ChildPtr(TKqpOpAggregate::idx_AggregationTraitsList)).Cast<TExprList>()) {
const auto aggTrait = TExprBase(item).Cast<TKqpOpAggregationTraits>();
const auto originalColName = aggTrait.OriginalColName();
const auto resultColName = aggTrait.ResultColName();
const auto originalColName = TString(aggTrait.OriginalColName());
const auto resultColName = TString(aggTrait.ResultColName());
const auto* resultType = aggTrait.AggregationFunctionResultType().Ptr()->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
aggTraitsMap[originalColName] = {resultColName, resultType};
}

TVector<const TItemExprType*> newItemTypes;
for (const auto* itemType : structType->GetItems()) {
if (auto it = aggTraitsMap.find(itemType->GetName()); it != aggTraitsMap.end()) {
if (auto it = aggTraitsMap.find(TString(itemType->GetName())); it != aggTraitsMap.end()) {
newItemTypes.push_back(ctx.MakeType<TItemExprType>(it->second.first, it->second.second));
} else {
newItemTypes.push_back(itemType);
Expand Down
53 changes: 52 additions & 1 deletion ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ struct TJoinTableAliases {
};

THashSet<TString> SupportedAggregationFunctions{"sum", "min", "max", "count"};
ui64 KqpUniqueAggColumnId{0};

TJoinTableAliases GatherJoinAliasesLeftSideMultiInputs(const TVector<TInfoUnit> &joinKeys, const THashSet<TString> &processedInputs) {
TJoinTableAliases joinAliases;
Expand Down Expand Up @@ -388,6 +389,7 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
}

// FIXME: Group by key can be an expression, we need to handle this case
TVector<std::pair<TInfoUnit, TInfoUnit>> aggRenamesMap;
TVector<TInfoUnit> groupByKeys;
auto groupOps = GetSetting(setItem->Tail(), "group_exprs");
if (groupOps) {
Expand All @@ -398,6 +400,9 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
TVector<TInfoUnit> keys;
GetAllMembers(body, keys);
groupByKeys.insert(groupByKeys.end(), keys.begin(), keys.end());
for (const auto &infoUnit : keys) {
aggRenamesMap.push_back({infoUnit, infoUnit});
}
}
}

Expand All @@ -407,6 +412,8 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,

// This is a hack to enable convertion for aggregation columns.
THashSet<TString> aggregationColumns;
THashSet<TString> columnNames;
bool needToRenameAggFields = false;
// Collect PgAgg for each result item at first pass.
TVector<TKqpOpAggregationTraits> aggTraitsList;
for (ui32 i = 0; i < result->Child(1)->ChildrenSize(); ++i) {
Expand All @@ -418,6 +425,21 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
TVector<TInfoUnit> originalColNames;
GetAllMembers(pgAgg, originalColNames);
Y_ENSURE(originalColNames.size() == 1, "Invalid column size for aggregation columns.");
const auto& originalColName = originalColNames.front();
auto renamedColName = originalColName;

// Rename agg column we will add a map to map same column to different renames.
if (columnNames.count(originalColName.GetFullName())) {
TStringBuilder strBuilder;
strBuilder << "_kqp_agg_input_";
strBuilder << originalColName.ColumnName;
strBuilder << "_";
strBuilder << ToString(KqpUniqueAggColumnId++);
renamedColName = TInfoUnit(originalColName.Alias, strBuilder);
needToRenameAggFields = true;
}
aggRenamesMap.push_back({originalColName, renamedColName});
columnNames.insert(renamedColName.GetFullName());

// Result column name.
auto resultColName = TString(resultItem->Child(0)->Content());
Expand All @@ -429,7 +451,7 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
// clang-format off
auto aggregationTraits = Build<TKqpOpAggregationTraits>(ctx, node->Pos())
.OriginalColName<TCoAtom>()
.Value(originalColNames.front().GetFullName())
.Value(renamedColName.GetFullName())
.Build()
.AggregationFunction<TCoAtom>()
.Value(aggFuncName)
Expand All @@ -446,6 +468,35 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,

TExprNode::TPtr resultExpr = filterExpr;
if (!aggTraitsList.empty()) {
if (needToRenameAggFields) {
TVector<TExprNode::TPtr> mapElements;
for (const auto &[colName, newColName] : aggRenamesMap) {
// clang-format off
mapElements.push_back(Build<TKqpOpMapElementRename>(ctx, node->Pos())
.Input(resultExpr)
.Variable()
.Value(newColName.GetFullName())
.Build()
.From()
.Value(colName.GetFullName())
.Build()
.Done().Ptr());
// clang-format on
}

// clang-format off
resultExpr = Build<TKqpOpMap>(ctx, node->Pos())
.Input(resultExpr)
.MapElements()
.Add(mapElements)
.Build()
.Project()
.Value("false")
.Build()
.Done().Ptr();
// clang-format on
}

TVector<TCoAtom> keyColumns;
for (const auto &column : groupByKeys) {
// clang-format off
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/opt/rbo/kqp_rbo_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ TStatus ComputeTypes(std::shared_ptr<TOpAggregate> aggregate, TRBOContext& ctx)

TVector<const TItemExprType*> newItemTypes;
for (const auto* itemType : structType->GetItems()) {

// The type of the column could be changed after aggregation.
if (auto it = aggTraitsMap.find(itemType->GetName()); it != aggTraitsMap.end()) {
const auto& resultColName = it->second.first;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,11 @@ Y_UNIT_TEST_SUITE(KqpRbo) {
set TablePathPrefix = "/Root/";
select sum(t1.c), t1.b from t1 group by t1.b order by t1.b;
)",
R"(
--!syntax_pg
set TablePathPrefix = "/Root/";
select max(t1.a), min(t1.a), min(t1.b) as min_b from t1;
)",
};

std::vector<std::string> results = {
Expand All @@ -544,6 +549,7 @@ Y_UNIT_TEST_SUITE(KqpRbo) {
R"([["2";"0"]])",
R"([["6"];["4"]])",
R"([["4";"1"];["6";"2"]])",
R"([["4";"0";"1"]])",
};

for (ui32 i = 0; i < queries.size(); ++i) {
Expand Down
Loading