Skip to content

Commit 81eb8ba

Browse files
authored
Merge 284019a into 73359a7
2 parents 73359a7 + 284019a commit 81eb8ba

File tree

13 files changed

+404
-6
lines changed

13 files changed

+404
-6
lines changed

ydb/library/yql/core/type_ann/type_ann_core.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12086,6 +12086,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
1208612086
Functions["PgGroupRef"] = &PgGroupRefWrapper;
1208712087
Functions["PgGrouping"] = &PgGroupingWrapper;
1208812088
Functions["PgGroupingSet"] = &PgGroupingSetWrapper;
12089+
Functions["PgToRecord"] = &PgToRecordWrapper;
1208912090

1209012091
Functions["AutoDemux"] = &AutoDemuxWrapper;
1209112092
Functions["AggrCountInit"] = &AggrCountInitWrapper;

ydb/library/yql/core/type_ann/type_ann_pg.cpp

Lines changed: 163 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1583,6 +1583,10 @@ bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>&
15831583
}
15841584

15851585
TString foundAlias;
1586+
bool matchedAlias = false;
1587+
ui32 matchedAliasI = 0;
1588+
TMaybe<ui32> matchedAliasIndex;
1589+
TMaybe<ui32> matchedAliasIndexI;
15861590
for (ui32 priority : {TInput::Projection, TInput::Current, TInput::External}) {
15871591
ui32 matches = 0;
15881592
for (ui32 inputIndex = 0; inputIndex < inputs.size(); ++inputIndex) {
@@ -1597,6 +1601,16 @@ bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>&
15971601
}
15981602
}
15991603

1604+
if (!x.Alias.empty()) {
1605+
if (node->Tail().Content() == x.Alias) {
1606+
matchedAlias = true;
1607+
matchedAliasIndex = inputIndex;
1608+
} else if (AsciiEqualsIgnoreCase(node->Tail().Content(), x.Alias)) {
1609+
++matchedAliasI;
1610+
matchedAliasIndexI = inputIndex;
1611+
}
1612+
}
1613+
16001614
auto pos = x.Type->FindItemI(node->Tail().Content());
16011615
if (pos) {
16021616
foundAlias = x.Alias;
@@ -1624,10 +1638,39 @@ bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>&
16241638
return true;
16251639
}
16261640

1627-
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()),
1628-
TStringBuilder() << "No such column: " << node->Tail().Content()));
1629-
isError = true;
1630-
return false;
1641+
TInput* tableRefInput = nullptr;
1642+
if (matchedAlias) {
1643+
tableRefInput = &inputs[*matchedAliasIndex];
1644+
} else {
1645+
if (matchedAliasI > 1) {
1646+
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()),
1647+
TStringBuilder() << "Table reference is ambiguous: " << node->Tail().Content()));
1648+
isError = true;
1649+
return false;
1650+
}
1651+
1652+
if (matchedAliasI == 1) {
1653+
tableRefInput = &inputs[*matchedAliasIndexI];
1654+
}
1655+
}
1656+
1657+
if (!tableRefInput) {
1658+
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()),
1659+
TStringBuilder() << "No such column: " << node->Tail().Content()));
1660+
isError = true;
1661+
return false;
1662+
}
1663+
1664+
for (const auto& item : tableRefInput->Type->GetItems()) {
1665+
if (!item->GetName().StartsWith("_yql_")) {
1666+
refs.insert(TString(item->GetName()));
1667+
if (tableRefInput->Priority == TInput::External) {
1668+
tableRefInput->UsedExternalColumns.insert(TString(item->GetName()));
1669+
}
1670+
}
1671+
}
1672+
1673+
return true;
16311674
}
16321675
}
16331676

@@ -1890,6 +1933,8 @@ IGraphTransformer::TStatus RebuildLambdaColumns(const TExprNode::TPtr& root, con
18901933
}
18911934

18921935
if (node->IsCallable("PgColumnRef")) {
1936+
const TInput* matchedAliasInput = nullptr;
1937+
const TInput* matchedAliasInputI = nullptr;
18931938
for (ui32 priority : { TInput::Projection, TInput::Current, TInput::External }) {
18941939
for (const auto& x : inputs) {
18951940
if (priority != x.Priority) {
@@ -1902,6 +1947,14 @@ IGraphTransformer::TStatus RebuildLambdaColumns(const TExprNode::TPtr& root, con
19021947
}
19031948
}
19041949

1950+
if (!x.Alias.empty()) {
1951+
if (node->Tail().Content() == x.Alias) {
1952+
matchedAliasInput = &x;
1953+
} else if (AsciiEqualsIgnoreCase(node->Tail().Content(), x.Alias)) {
1954+
matchedAliasInputI = &x;
1955+
}
1956+
}
1957+
19051958
auto pos = x.Type->FindItemI(node->Tail().Content());
19061959
if (pos) {
19071960
return ctx.Expr.Builder(node->Pos())
@@ -1914,6 +1967,41 @@ IGraphTransformer::TStatus RebuildLambdaColumns(const TExprNode::TPtr& root, con
19141967
}
19151968
}
19161969

1970+
if (!matchedAliasInput && matchedAliasInputI) {
1971+
matchedAliasInput = matchedAliasInputI;
1972+
}
1973+
1974+
if (matchedAliasInput) {
1975+
return ctx.Expr.Builder(node->Pos())
1976+
.Callable("PgToRecord")
1977+
.Callable(0, "DivePrefixMembers")
1978+
.Add(0, argNode)
1979+
.List(1)
1980+
.Atom(0, MakeAliasedColumn(matchedAliasInput->Alias, ""))
1981+
.Seal()
1982+
.Seal()
1983+
.List(1)
1984+
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder & {
1985+
ui32 pos = 0;
1986+
for (ui32 i = 0; i < matchedAliasInput->Type->GetSize(); ++i) {
1987+
auto columnName = matchedAliasInput->Order ?
1988+
matchedAliasInput->Order.GetRef()[i] :
1989+
matchedAliasInput->Type->GetItems()[i]->GetName();
1990+
if (!columnName.StartsWith("_yql_")) {
1991+
parent.List(pos++)
1992+
.Atom(0, columnName)
1993+
.Atom(1, columnName)
1994+
.Seal();
1995+
}
1996+
}
1997+
1998+
return parent;
1999+
})
2000+
.Seal()
2001+
.Seal()
2002+
.Build();
2003+
}
2004+
19172005
YQL_ENSURE(false, "Missing input");
19182006
}
19192007

@@ -5276,5 +5364,76 @@ IGraphTransformer::TStatus PgGroupingSetWrapper(const TExprNode::TPtr& input, TE
52765364
return IGraphTransformer::TStatus::Ok;
52775365
}
52785366

5367+
IGraphTransformer::TStatus PgToRecordWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
5368+
if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
5369+
return IGraphTransformer::TStatus::Error;
5370+
}
5371+
5372+
if (!EnsureStructType(input->Head(), ctx.Expr)) {
5373+
return IGraphTransformer::TStatus::Error;
5374+
}
5375+
5376+
auto structType = input->Head().GetTypeAnn()->Cast<TStructExprType>();
5377+
bool hasConversions = false;
5378+
for (ui32 pass = 0; pass < 2; ++pass) {
5379+
TExprNode::TListType convItems;
5380+
for (ui32 i = 0; i < structType->GetSize(); ++i) {
5381+
ui32 pgType;
5382+
bool convertToPg;
5383+
if (!ExtractPgType(structType->GetItems()[i]->GetItemType(), pgType, convertToPg, input->Head().Pos(), ctx.Expr)) {
5384+
return IGraphTransformer::TStatus::Error;
5385+
}
5386+
5387+
hasConversions = hasConversions || convertToPg;
5388+
if (pass == 1) {
5389+
auto name = ctx.Expr.NewAtom(input->Head().Pos(), structType->GetItems()[i]->GetName());
5390+
auto member = ctx.Expr.NewCallable(input->Head().Pos(), "Member", { input->HeadPtr(), name} );
5391+
if (convertToPg) {
5392+
member = ctx.Expr.NewCallable(input->Head().Pos(), "ToPg", { member });
5393+
}
5394+
5395+
convItems.push_back(ctx.Expr.NewList(input->Head().Pos(), { name, member }));
5396+
}
5397+
}
5398+
5399+
if (!hasConversions) {
5400+
break;
5401+
}
5402+
5403+
if (pass == 1) {
5404+
output = ctx.Expr.ChangeChild(*input, 0, ctx.Expr.NewCallable(input->Head().Pos(), "AsStruct", std::move(convItems)));
5405+
return IGraphTransformer::TStatus::Repeat;
5406+
}
5407+
}
5408+
5409+
if (!EnsureTuple(input->Tail(), ctx.Expr)) {
5410+
return IGraphTransformer::TStatus::Error;
5411+
}
5412+
5413+
for (const auto& child : input->Tail().Children()) {
5414+
if (!EnsureTupleSize(*child, 2, ctx.Expr)) {
5415+
return IGraphTransformer::TStatus::Error;
5416+
}
5417+
5418+
if (!EnsureAtom(child->Head(), ctx.Expr)) {
5419+
return IGraphTransformer::TStatus::Error;
5420+
}
5421+
5422+
if (!EnsureAtom(child->Tail(), ctx.Expr)) {
5423+
return IGraphTransformer::TStatus::Error;
5424+
}
5425+
5426+
auto pos = structType->FindItem(child->Tail().Content());
5427+
if (!pos) {
5428+
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(child->Pos()),
5429+
TStringBuilder() << "Missing member: " << child->Tail().Content()));
5430+
return IGraphTransformer::TStatus::Error;
5431+
}
5432+
}
5433+
5434+
input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(NPg::LookupType("record").TypeId));
5435+
return IGraphTransformer::TStatus::Ok;
5436+
}
5437+
52795438
} // namespace NTypeAnnImpl
52805439
}

ydb/library/yql/core/type_ann/type_ann_pg.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ IGraphTransformer::TStatus PgSubLinkWrapper(const TExprNode::TPtr& input, TExprN
5353
IGraphTransformer::TStatus PgGroupRefWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
5454
IGraphTransformer::TStatus PgGroupingWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
5555
IGraphTransformer::TStatus PgGroupingSetWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
56+
IGraphTransformer::TStatus PgToRecordWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
5657

5758
} // namespace NTypeAnnImpl
5859
} // namespace NYql

ydb/library/yql/minikql/mkql_program_builder.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "ydb/library/yql/minikql/mkql_type_builder.h"
1010
#include "ydb/library/yql/core/sql_types/match_recognize.h"
1111
#include "ydb/library/yql/core/sql_types/time_order_recover.h"
12+
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
1213

1314
#include <util/string/cast.h>
1415
#include <util/string/printf.h>
@@ -5470,6 +5471,31 @@ TRuntimeNode TProgramBuilder::PgTableContent(
54705471
return TRuntimeNode(callableBuilder.Build(), false);
54715472
}
54725473

5474+
TRuntimeNode TProgramBuilder::PgToRecord(TRuntimeNode input, const TArrayRef<std::pair<std::string_view, std::string_view>>& members) {
5475+
if constexpr (RuntimeVersion < 48U) {
5476+
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
5477+
}
5478+
5479+
MKQL_ENSURE(input.GetStaticType()->IsStruct(), "Expected struct");
5480+
auto structType = AS_TYPE(TStructType, input.GetStaticType());
5481+
for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
5482+
auto itemType = structType->GetMemberType(i);
5483+
MKQL_ENSURE(itemType->IsNull() || itemType->IsPg(), "Expected null or pg");
5484+
}
5485+
5486+
auto returnType = NewPgType(NYql::NPg::LookupType("record").TypeId);
5487+
TCallableBuilder callableBuilder(Env, __func__, returnType);
5488+
callableBuilder.Add(input);
5489+
TVector<TRuntimeNode> names;
5490+
for (const auto& x : members) {
5491+
names.push_back(NewDataLiteral<NUdf::EDataSlot::String>(x.first));
5492+
names.push_back(NewDataLiteral<NUdf::EDataSlot::String>(x.second));
5493+
}
5494+
5495+
callableBuilder.Add(NewTuple(names));
5496+
return TRuntimeNode(callableBuilder.Build(), false);
5497+
}
5498+
54735499
TRuntimeNode TProgramBuilder::PgCast(TRuntimeNode input, TType* returnType, TRuntimeNode typeMod) {
54745500
if constexpr (RuntimeVersion < 30U) {
54755501
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;

ydb/library/yql/minikql/mkql_program_builder.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,7 @@ class TProgramBuilder : public TTypeBuilder {
672672
const std::string_view& cluster,
673673
const std::string_view& table,
674674
TType* returnType);
675+
TRuntimeNode PgToRecord(TRuntimeNode input, const TArrayRef<std::pair<std::string_view, std::string_view>>& members);
675676

676677
TRuntimeNode ScalarApply(const TArrayRef<const TRuntimeNode>& args, const TArrayLambda& handler);
677678

ydb/library/yql/minikql/mkql_runtime_version.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace NMiniKQL {
2424
// 1. Bump this version every time incompatible runtime nodes are introduced.
2525
// 2. Make sure you provide runtime node generation for previous runtime versions.
2626
#ifndef MKQL_RUNTIME_VERSION
27-
#define MKQL_RUNTIME_VERSION 47U
27+
#define MKQL_RUNTIME_VERSION 48U
2828
#endif
2929

3030
// History:

0 commit comments

Comments
 (0)