Skip to content

Commit b759454

Browse files
authored
Merge b300155 into 88f362b
2 parents 88f362b + b300155 commit b759454

File tree

4 files changed

+62
-6
lines changed

4 files changed

+62
-6
lines changed

ydb/core/kqp/opt/kqp_type_ann.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2471,18 +2471,18 @@ TStatus AnnotateOpAggregate(const TExprNode::TPtr& input, TExprContext& ctx) {
24712471
const auto* inputType = input->ChildPtr(TKqpOpAggregate::idx_Input)->GetTypeAnn();
24722472
const auto* structType = inputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
24732473

2474-
THashMap<TStringBuf, std::pair<TStringBuf, const TTypeAnnotationNode*>> aggTraitsMap;
2474+
THashMap<TString, std::pair<TString, const TTypeAnnotationNode*>> aggTraitsMap;
24752475
for (const auto& item : TExprBase(input->ChildPtr(TKqpOpAggregate::idx_AggregationTraitsList)).Cast<TExprList>()) {
24762476
const auto aggTrait = TExprBase(item).Cast<TKqpOpAggregationTraits>();
2477-
const auto originalColName = aggTrait.OriginalColName();
2478-
const auto resultColName = aggTrait.ResultColName();
2477+
const auto originalColName = TString(aggTrait.OriginalColName());
2478+
const auto resultColName = TString(aggTrait.ResultColName());
24792479
const auto* resultType = aggTrait.AggregationFunctionResultType().Ptr()->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
24802480
aggTraitsMap[originalColName] = {resultColName, resultType};
24812481
}
24822482

24832483
TVector<const TItemExprType*> newItemTypes;
24842484
for (const auto* itemType : structType->GetItems()) {
2485-
if (auto it = aggTraitsMap.find(itemType->GetName()); it != aggTraitsMap.end()) {
2485+
if (auto it = aggTraitsMap.find(TString(itemType->GetName())); it != aggTraitsMap.end()) {
24862486
newItemTypes.push_back(ctx.MakeType<TItemExprType>(it->second.first, it->second.second));
24872487
} else {
24882488
newItemTypes.push_back(itemType);

ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ struct TJoinTableAliases {
1616
};
1717

1818
THashSet<TString> SupportedAggregationFunctions{"sum", "min", "max", "count"};
19+
ui64 KqpUniqueAggColumnId{0};
1920

2021
TJoinTableAliases GatherJoinAliasesLeftSideMultiInputs(const TVector<TInfoUnit> &joinKeys, const THashSet<TString> &processedInputs) {
2122
TJoinTableAliases joinAliases;
@@ -388,6 +389,7 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
388389
}
389390

390391
// FIXME: Group by key can be an expression, we need to handle this case
392+
TVector<std::pair<TInfoUnit, TInfoUnit>> aggRenamesMap;
391393
TVector<TInfoUnit> groupByKeys;
392394
auto groupOps = GetSetting(setItem->Tail(), "group_exprs");
393395
if (groupOps) {
@@ -398,6 +400,9 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
398400
TVector<TInfoUnit> keys;
399401
GetAllMembers(body, keys);
400402
groupByKeys.insert(groupByKeys.end(), keys.begin(), keys.end());
403+
for (const auto &infoUnit : keys) {
404+
aggRenamesMap.push_back({infoUnit, infoUnit});
405+
}
401406
}
402407
}
403408

@@ -407,6 +412,8 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
407412

408413
// This is a hack to enable convertion for aggregation columns.
409414
THashSet<TString> aggregationColumns;
415+
THashSet<TString> columnNames;
416+
bool needToRenameAggFields = false;
410417
// Collect PgAgg for each result item at first pass.
411418
TVector<TKqpOpAggregationTraits> aggTraitsList;
412419
for (ui32 i = 0; i < result->Child(1)->ChildrenSize(); ++i) {
@@ -418,6 +425,21 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
418425
TVector<TInfoUnit> originalColNames;
419426
GetAllMembers(pgAgg, originalColNames);
420427
Y_ENSURE(originalColNames.size() == 1, "Invalid column size for aggregation columns.");
428+
const auto& originalColName = originalColNames.front();
429+
auto renamedColName = originalColName;
430+
431+
// Rename agg column we will add a map to map same column to different renames.
432+
if (columnNames.count(originalColName.GetFullName())) {
433+
TStringBuilder strBuilder;
434+
strBuilder << "_kqp_agg_input_";
435+
strBuilder << originalColName.ColumnName;
436+
strBuilder << "_";
437+
strBuilder << ToString(KqpUniqueAggColumnId++);
438+
renamedColName = TInfoUnit(originalColName.Alias, strBuilder);
439+
needToRenameAggFields = true;
440+
}
441+
aggRenamesMap.push_back({originalColName, renamedColName});
442+
columnNames.insert(renamedColName.GetFullName());
421443

422444
// Result column name.
423445
auto resultColName = TString(resultItem->Child(0)->Content());
@@ -429,7 +451,7 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
429451
// clang-format off
430452
auto aggregationTraits = Build<TKqpOpAggregationTraits>(ctx, node->Pos())
431453
.OriginalColName<TCoAtom>()
432-
.Value(originalColNames.front().GetFullName())
454+
.Value(renamedColName.GetFullName())
433455
.Build()
434456
.AggregationFunction<TCoAtom>()
435457
.Value(aggFuncName)
@@ -446,6 +468,35 @@ TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx,
446468

447469
TExprNode::TPtr resultExpr = filterExpr;
448470
if (!aggTraitsList.empty()) {
471+
if (needToRenameAggFields) {
472+
TVector<TExprNode::TPtr> mapElements;
473+
for (const auto &[colName, newColName] : aggRenamesMap) {
474+
// clang-format off
475+
mapElements.push_back(Build<TKqpOpMapElementRename>(ctx, node->Pos())
476+
.Input(resultExpr)
477+
.Variable()
478+
.Value(newColName.GetFullName())
479+
.Build()
480+
.From()
481+
.Value(colName.GetFullName())
482+
.Build()
483+
.Done().Ptr());
484+
// clang-format on
485+
}
486+
487+
// clang-format off
488+
resultExpr = Build<TKqpOpMap>(ctx, node->Pos())
489+
.Input(resultExpr)
490+
.MapElements()
491+
.Add(mapElements)
492+
.Build()
493+
.Project()
494+
.Value("false")
495+
.Build()
496+
.Done().Ptr();
497+
// clang-format on
498+
}
499+
449500
TVector<TCoAtom> keyColumns;
450501
for (const auto &column : groupByKeys) {
451502
// clang-format off

ydb/core/kqp/opt/rbo/kqp_rbo_type_ann.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ TStatus ComputeTypes(std::shared_ptr<TOpAggregate> aggregate, TRBOContext& ctx)
239239

240240
TVector<const TItemExprType*> newItemTypes;
241241
for (const auto* itemType : structType->GetItems()) {
242-
243242
// The type of the column could be changed after aggregation.
244243
if (auto it = aggTraitsMap.find(itemType->GetName()); it != aggTraitsMap.end()) {
245244
const auto& resultColName = it->second.first;

ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,11 @@ Y_UNIT_TEST_SUITE(KqpRbo) {
532532
set TablePathPrefix = "/Root/";
533533
select sum(t1.c), t1.b from t1 group by t1.b order by t1.b;
534534
)",
535+
R"(
536+
--!syntax_pg
537+
set TablePathPrefix = "/Root/";
538+
select max(t1.a), min(t1.a), min(t1.b) as min_b from t1;
539+
)",
535540
};
536541

537542
std::vector<std::string> results = {
@@ -544,6 +549,7 @@ Y_UNIT_TEST_SUITE(KqpRbo) {
544549
R"([["2";"0"]])",
545550
R"([["6"];["4"]])",
546551
R"([["4";"1"];["6";"2"]])",
552+
R"([["4";"0";"1"]])",
547553
};
548554

549555
for (ui32 i = 0; i < queries.size(); ++i) {

0 commit comments

Comments
 (0)