Skip to content

Commit 778d2d7

Browse files
authored
YQ-3703 pushdown for extract members in PQ provider (#9939)
1 parent bc116d9 commit 778d2d7

File tree

4 files changed

+72
-75
lines changed

4 files changed

+72
-75
lines changed

ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json

+1-2
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@
7171
{"Index": 1, "Name": "Columns", "Type": "TExprBase"},
7272
{"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"},
7373
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
74-
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"},
75-
{"Index": 5, "Name": "ColumnTypes", "Type": "TExprBase"}
74+
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
7675
]
7776
},
7877
{

ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
132132
}
133133

134134
TStatus HandleDqTopicSource(TExprBase input, TExprContext& ctx) {
135-
if (!EnsureArgsCount(input.Ref(), 6, ctx)) {
135+
if (!EnsureArgsCount(input.Ref(), 5, ctx)) {
136136
return TStatus::Error;
137137
}
138138

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

+8-20
Original file line numberDiff line numberDiff line change
@@ -124,25 +124,17 @@ class TPqDqIntegration: public TDqIntegrationBase {
124124

125125
const auto token = "cluster:default_" + clusterName;
126126

127-
auto rowSchema = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
128-
TExprNode::TListType colTypes;
129-
const auto& typeItems = rowSchema->GetItems();
130-
colTypes.reserve(typeItems.size());
131-
const auto pos = read->Pos(); // TODO
132-
std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colTypes),
133-
[&](const TItemExprType* item) {
134-
return ctx.NewAtom(pos, FormatType(item->GetItemType()));
135-
});
136-
auto columnTypes = ctx.NewList(pos, std::move(colTypes));
137-
127+
const auto& typeItems = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>()->GetItems();
128+
const auto pos = read->Pos();
129+
138130
TExprNode::TListType colNames;
139131
colNames.reserve(typeItems.size());
140132
std::transform(typeItems.cbegin(), typeItems.cend(), std::back_inserter(colNames),
141133
[&](const TItemExprType* item) {
142134
return ctx.NewAtom(pos, item->GetName());
143135
});
144136
auto columnNames = ctx.NewList(pos, std::move(colNames));
145-
137+
146138
auto row = Build<TCoArgument>(ctx, read->Pos())
147139
.Name("row")
148140
.Done();
@@ -153,7 +145,6 @@ class TPqDqIntegration: public TDqIntegrationBase {
153145
.Build()
154146
.Done().Ptr();
155147

156-
157148
return Build<TDqSourceWrap>(ctx, read->Pos())
158149
.Input<TDqPqTopicSource>()
159150
.Topic(pqReadTopic.Topic())
@@ -163,7 +154,6 @@ class TPqDqIntegration: public TDqIntegrationBase {
163154
.Name().Build(token)
164155
.Build()
165156
.FilterPredicate(emptyPredicate)
166-
.ColumnTypes(std::move(columnTypes))
167157
.Build()
168158
.RowType(ExpandType(pqReadTopic.Pos(), *rowType, ctx))
169159
.DataSource(pqReadTopic.DataSource().Cast<TCoDataSource>())
@@ -263,14 +253,12 @@ class TPqDqIntegration: public TDqIntegrationBase {
263253
srcDesc.AddMetadataFields(metadata.Value().Maybe<TCoAtom>().Cast().StringValue());
264254
}
265255

266-
for (const auto& column : topicSource.Columns().Cast<TCoAtomList>()) {
267-
srcDesc.AddColumns(column.StringValue());
256+
const auto rowSchema = topic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
257+
for (const auto& item : rowSchema->GetItems()) {
258+
srcDesc.AddColumns(TString(item->GetName()));
259+
srcDesc.AddColumnTypes(FormatType(item->GetItemType()));
268260
}
269261

270-
for (const auto& columnTypes : topicSource.ColumnTypes().Cast<TCoAtomList>()) {
271-
srcDesc.AddColumnTypes(columnTypes.StringValue());
272-
}
273-
274262
NYql::NConnector::NApi::TPredicate predicateProto;
275263
if (auto predicate = topicSource.FilterPredicate(); !NYql::IsEmptyFilterPredicate(predicate)) {
276264
TStringBuilder err;

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

+62-52
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "yql_pq_provider_impl.h"
22

33
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
4+
#include <ydb/library/yql/core/yql_opt_utils.h>
45
#include <ydb/library/yql/core/yql_type_helpers.h>
56
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
67
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
@@ -30,22 +31,20 @@ namespace {
3031
}
3132
};
3233

33-
std::unordered_set<TString> GetUsedMetadataFields(const TCoExtractMembers& extract) {
34-
std::unordered_set<TString> usedMetadataFields;
35-
for (const auto extractMember : extract.Members()) {
36-
if (FindPqMetaFieldDescriptorBySysColumn(extractMember.StringValue())) {
37-
usedMetadataFields.emplace(extractMember.StringValue());
38-
}
34+
std::unordered_set<TString> GetUsedColumnNames(const TCoExtractMembers& extractMembers) {
35+
std::unordered_set<TString> usedColumnNames;
36+
for (const auto& member : extractMembers.Members()) {
37+
usedColumnNames.emplace(member.StringValue());
3938
}
4039

41-
return usedMetadataFields;
40+
return usedColumnNames;
4241
}
4342

44-
TVector<TCoNameValueTuple> DropUnusedMetadata(const TPqTopic& pqTopic, const std::unordered_set<TString>& usedMetadataFields) {
43+
TVector<TCoNameValueTuple> DropUnusedMetadata(const TPqTopic& pqTopic, const std::unordered_set<TString>& usedColumnNames) {
4544
TVector<TCoNameValueTuple> newSourceMetadata;
4645
for (auto metadataItem : pqTopic.Metadata()) {
4746
auto metadataName = metadataItem.Cast<TCoNameValueTuple>().Value().Maybe<TCoAtom>().Cast().StringValue();
48-
if (usedMetadataFields.contains(metadataName)) {
47+
if (FindPqMetaFieldDescriptorBySysColumn(metadataName) && usedColumnNames.contains(metadataName)) {
4948
newSourceMetadata.push_back(metadataItem);
5049
}
5150
}
@@ -88,18 +87,18 @@ TCoNameValueTupleList DropUnusedMetadataFromDqWrapSettings(
8887
.Done();
8988
}
9089

91-
TExprNode::TPtr DropUnusedMetadataFieldsFromRowType(
90+
TExprNode::TPtr DropUnusedRowItems(
9291
TPositionHandle position,
9392
const TStructExprType* oldRowType,
94-
const std::unordered_set<TString>& usedMetadataFields,
93+
const std::unordered_set<TString>& usedColumnNames,
9594
TExprContext& ctx)
9695
{
9796
TVector<const TItemExprType*> newFields;
9897
newFields.reserve(oldRowType->GetSize());
9998

10099
for (auto itemExprType : oldRowType->GetItems()) {
101100
const auto columnName = TString(itemExprType->GetName());
102-
if (FindPqMetaFieldDescriptorBySysColumn(columnName) && !usedMetadataFields.contains(columnName)) {
101+
if (!usedColumnNames.contains(columnName)) {
103102
continue;
104103
}
105104

@@ -109,14 +108,14 @@ TExprNode::TPtr DropUnusedMetadataFieldsFromRowType(
109108
return ExpandType(position, *ctx.MakeType<TStructExprType>(newFields), ctx);
110109
}
111110

112-
TExprNode::TPtr DropUnusedMetadataFieldsFromColumns(
111+
TExprNode::TPtr DropUnusedColumns(
113112
TExprBase oldColumns,
114-
const std::unordered_set<TString>& usedMetadataFields,
113+
const std::unordered_set<TString>& usedColumnNames,
115114
TExprContext& ctx)
116115
{
117116
TExprNode::TListType res;
118117
for (const auto& column : oldColumns.Cast<TCoAtomList>()) {
119-
if (FindPqMetaFieldDescriptorBySysColumn(column.StringValue()) && !usedMetadataFields.contains(column.StringValue())) {
118+
if (!usedColumnNames.contains(column.StringValue())) {
120119
continue;
121120
}
122121

@@ -160,57 +159,68 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
160159
}*/
161160

162161
TMaybeNode<TExprBase> ExtractMembersOverDqWrap(TExprBase node, TExprContext& ctx) const {
163-
const auto& extract = node.Cast<TCoExtractMembers>();
164-
const auto& input = extract.Input();
165-
const auto dqSourceWrap = input.Maybe<TDqSourceWrap>();
166-
const auto dqPqTopicSource = dqSourceWrap.Input().Maybe<TDqPqTopicSource>();
167-
const auto pqTopic = dqPqTopicSource.Topic().Maybe<TPqTopic>();
168-
if (!pqTopic) {
162+
const auto& extractMembers = node.Cast<TCoExtractMembers>();
163+
const auto& extractMembersInput = extractMembers.Input();
164+
const auto& maybeDqSourceWrap = extractMembersInput.Maybe<TDqSourceWrap>();
165+
if (!maybeDqSourceWrap) {
166+
return node;
167+
}
168+
169+
const auto& dqSourceWrap = maybeDqSourceWrap.Cast();
170+
if (dqSourceWrap.DataSource().Category() != PqProviderName) {
171+
return node;
172+
}
173+
174+
const auto& maybeDqPqTopicSource = dqSourceWrap.Input().Maybe<TDqPqTopicSource>();
175+
if (!maybeDqPqTopicSource) {
169176
return node;
170177
}
171178

172-
const auto usedMetadataFields = GetUsedMetadataFields(extract);
173-
const auto newSourceMetadata = DropUnusedMetadata(pqTopic.Cast(), usedMetadataFields);
174-
if (newSourceMetadata.size() == pqTopic.Metadata().Cast().Size()) {
179+
const auto& dqPqTopicSource = maybeDqPqTopicSource.Cast();
180+
const auto& pqTopic = dqPqTopicSource.Topic();
181+
182+
auto usedColumnNames = GetUsedColumnNames(extractMembers);
183+
const TStructExprType* inputRowType = pqTopic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
184+
const TStructExprType* outputRowType = node.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
185+
if (outputRowType->GetSize() == 0 && inputRowType->GetSize() > 0) {
186+
auto item = GetLightColumn(*inputRowType);
187+
YQL_ENSURE(item);
188+
YQL_ENSURE(usedColumnNames.insert(TString(item->GetName())).second);
189+
}
190+
191+
const auto oldRowType = pqTopic.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
192+
if (oldRowType->GetSize() == usedColumnNames.size()) {
175193
return node;
176194
}
177195

178-
const auto oldRowType = pqTopic.Ref().GetTypeAnn()
179-
->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
196+
const auto& newSourceMetadata = DropUnusedMetadata(pqTopic, usedColumnNames);
180197

181-
auto newPqTopicSource = Build<TDqPqTopicSource>(ctx, node.Pos())
182-
.InitFrom(dqPqTopicSource.Cast())
198+
const TExprNode::TPtr newPqTopicSource = Build<TDqPqTopicSource>(ctx, dqPqTopicSource.Pos())
199+
.InitFrom(dqPqTopicSource)
183200
.Topic<TPqTopic>()
184-
.InitFrom(pqTopic.Cast())
201+
.InitFrom(pqTopic)
185202
.Metadata().Add(newSourceMetadata).Build()
186-
.Build();
187-
188-
if (dqPqTopicSource.Columns()) {
189-
auto newColumns = DropUnusedMetadataFieldsFromColumns(
190-
dqPqTopicSource.Columns().Cast(),
191-
usedMetadataFields,
192-
ctx);
193-
newPqTopicSource.Columns(newColumns);
194-
}
203+
.RowSpec(DropUnusedRowItems(pqTopic.RowSpec().Pos(), inputRowType, usedColumnNames, ctx))
204+
.Build()
205+
.Columns(DropUnusedColumns(dqPqTopicSource.Columns(), usedColumnNames, ctx))
206+
.Done()
207+
.Ptr();
195208

196-
const auto newDqSourceWrap = Build<TDqSourceWrap>(ctx, node.Pos())
197-
.InitFrom(dqSourceWrap.Cast())
198-
.Input(newPqTopicSource.Done())
199-
.Settings(DropUnusedMetadataFromDqWrapSettings(
200-
dqSourceWrap.Cast(),
201-
newSourceMetadata,
202-
ctx))
203-
.RowType(DropUnusedMetadataFieldsFromRowType(
204-
node.Pos(),
205-
oldRowType,
206-
usedMetadataFields,
207-
ctx))
209+
const TExprNode::TPtr newDqSourceWrap = Build<TDqSourceWrap>(ctx, dqSourceWrap.Pos())
210+
.InitFrom(dqSourceWrap)
211+
.Input(newPqTopicSource)
212+
.Settings(DropUnusedMetadataFromDqWrapSettings(dqSourceWrap, newSourceMetadata, ctx))
213+
.RowType(DropUnusedRowItems(dqSourceWrap.RowType().Pos(), oldRowType, usedColumnNames, ctx))
208214
.Done()
209215
.Ptr();
210216

217+
if (outputRowType->GetSize() == usedColumnNames.size()) {
218+
return newDqSourceWrap;
219+
}
220+
211221
return Build<TCoExtractMembers>(ctx, node.Pos())
212-
.InitFrom(extract)
213-
.Input(ctx.ReplaceNode(input.Ptr(), dqSourceWrap.Ref(), newDqSourceWrap))
222+
.InitFrom(extractMembers)
223+
.Input(ctx.ReplaceNode(extractMembersInput.Ptr(), dqSourceWrap.Ref(), newDqSourceWrap))
214224
.Done();
215225
}
216226

0 commit comments

Comments
 (0)