Skip to content

Commit 0040ea0

Browse files
authored
Merge 80f6bbd into d5c2c5c
2 parents d5c2c5c + 80f6bbd commit 0040ea0

File tree

9 files changed

+143
-54
lines changed

9 files changed

+143
-54
lines changed

ydb/library/yql/providers/common/pushdown/collection.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,38 @@ bool CompareCanBePushed(const TCoCompare& compare, const TExprNode* lambdaArg, c
420420
return true;
421421
}
422422

423+
bool SqlInCanBePushed(const TCoSqlIn& sqlIn, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) {
424+
const TExprBase& expr = sqlIn.Collection();
425+
const TExprBase& lookup = sqlIn.Lookup();
426+
427+
if (!CheckExpressionNodeForPushdown(lookup, lambdaArg, settings)) {
428+
return false;
429+
}
430+
431+
TExprNode::TPtr collection;
432+
if (expr.Ref().IsList()) {
433+
collection = expr.Ptr();
434+
} else if (auto maybeAsList = expr.Maybe<TCoAsList>()) {
435+
collection = maybeAsList.Cast().Ptr();
436+
} else {
437+
return false;
438+
}
439+
440+
const TTypeAnnotationNode* inputType = lambdaBody.Ptr()->GetTypeAnn();
441+
for (auto& child : collection->Children()) {
442+
if (!CheckExpressionNodeForPushdown(TExprBase(child), lambdaArg, settings)) {
443+
return false;
444+
}
445+
446+
if (!settings.IsEnabled(TSettings::EFeatureFlag::DoNotCheckCompareArgumentsTypes)) {
447+
if (!IsComparableTypes(lookup, TExprBase(child), false, inputType, settings)) {
448+
return false;
449+
}
450+
}
451+
}
452+
return true;
453+
}
454+
423455
bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg, const TSettings& settings) {
424456
/*
425457
* There are three ways of comparison in following format:
@@ -556,6 +588,9 @@ void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree
556588
CollectExpressionPredicate(predicateTree, predicate.Cast<TCoMember>(), lambdaArg);
557589
} else if (settings.IsEnabled(TSettings::EFeatureFlag::JustPassthroughOperators) && (predicate.Maybe<TCoIf>() || predicate.Maybe<TCoJust>())) {
558590
CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody, settings);
591+
} else if (settings.IsEnabled(TSettings::EFeatureFlag::InOperator) && predicate.Maybe<TCoSqlIn>()) {
592+
auto sqlIn = predicate.Cast<TCoSqlIn>();
593+
predicateTree.CanBePushed = SqlInCanBePushed(sqlIn, lambdaArg, lambdaBody, settings);
559594
} else {
560595
predicateTree.CanBePushed = false;
561596
}

ydb/library/yql/providers/common/pushdown/settings.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ struct TSettings {
2525
UnaryOperators = 1 << 15, // -, Abs, Size
2626
DoNotCheckCompareArgumentsTypes = 1 << 16,
2727
TimestampCtor = 1 << 17,
28-
JustPassthroughOperators = 1 << 18 // if + coalesce + just
28+
JustPassthroughOperators = 1 << 18, // if + coalesce + just
29+
InOperator = 1 << 19 // IN()
2930
};
3031

3132
explicit TSettings(NLog::EComponent logComponent)

ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace NYql {
2121
TString FormatIsNull(const TPredicate_TIsNull& isNull);
2222
TString FormatIsNotNull(const TPredicate_TIsNotNull& isNotNull);
2323
TString FormatPredicate(const TPredicate& predicate, bool topLevel);
24+
TString FormatIn(const TPredicate_TIn& in);
2425

2526
namespace {
2627

@@ -66,7 +67,6 @@ namespace NYql {
6667
if (auto member = expression.Maybe<TCoMember>()) {
6768
return SerializeMember(member.Cast(), proto, arg, err);
6869
}
69-
7070
// data
7171
MATCH_ATOM(Int8, INT8, int32, i8);
7272
MATCH_ATOM(Uint8, UINT8, uint32, ui8);
@@ -139,6 +139,33 @@ namespace NYql {
139139
return SerializeExpression(exists.Optional(), expressionProto, arg, err);
140140
}
141141

142+
bool SerializeSqlIn(const TCoSqlIn& sqlIn, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
143+
auto* dstProto = proto->mutable_in();
144+
const TExprBase& collection1 = sqlIn.Collection();
145+
const TExprBase& lookup = sqlIn.Lookup();
146+
147+
auto* expressionProto = dstProto->mutable_value();
148+
SerializeExpression(lookup, expressionProto, arg, err);
149+
150+
TExprNode::TPtr collection;
151+
if (collection1.Ref().IsList()) {
152+
collection = collection1.Ptr();
153+
} else if (auto maybeAsList = collection1.Maybe<TCoAsList>()) {
154+
collection = maybeAsList.Cast().Ptr();
155+
} else {
156+
err << "unknown operation: " << collection1.Ref().Content();
157+
return false;
158+
}
159+
160+
for (auto& child : collection->Children()) {
161+
if (!SerializeExpression(TExprBase(child), dstProto->add_set(), arg, err)) {
162+
return false;
163+
}
164+
}
165+
return true;
166+
}
167+
168+
142169
bool SerializeAnd(const TCoAnd& andExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
143170
auto* dstProto = proto->mutable_conjunction();
144171
for (const auto& child : andExpr.Ptr()->Children()) {
@@ -194,6 +221,9 @@ namespace NYql {
194221
if (auto exists = predicate.Maybe<TCoExists>()) {
195222
return SerializeExists(exists.Cast(), proto, arg, err);
196223
}
224+
if (auto sqlIn = predicate.Maybe<TCoSqlIn>()) {
225+
return SerializeSqlIn(sqlIn.Cast(), proto, arg, err);
226+
}
197227

198228
err << "unknown predicate: " << predicate.Raw()->Content();
199229
return false;
@@ -363,6 +393,8 @@ namespace NYql {
363393

364394
TString FormatComparison(TPredicate_TComparison comparison) {
365395
TString operation;
396+
auto left = FormatExpression(comparison.left_value());
397+
auto right = FormatExpression(comparison.right_value());
366398

367399
switch (comparison.operation()) {
368400
case TPredicate_TComparison::L:
@@ -387,12 +419,23 @@ namespace NYql {
387419
throw yexception() << "UnimplementedOperation, operation " << static_cast<ui64>(comparison.operation());
388420
}
389421

390-
auto left = FormatExpression(comparison.left_value());
391-
auto right = FormatExpression(comparison.right_value());
392-
393422
return left + operation + right;
394423
}
395424

425+
TString FormatIn(const TPredicate_TIn& in) {
426+
auto value = FormatExpression(in.value());
427+
TString list;
428+
for (const auto& expr : in.set()) {
429+
auto v = FormatExpression(expr);
430+
if (!list.empty()) {
431+
list += ",";
432+
}
433+
list += v;
434+
}
435+
436+
return value + " IN (" + list + ")";
437+
}
438+
396439
TString FormatPredicate(const TPredicate& predicate, bool topLevel ) {
397440
switch (predicate.payload_case()) {
398441
case TPredicate::PAYLOAD_NOT_SET:
@@ -411,6 +454,8 @@ namespace NYql {
411454
return FormatComparison(predicate.comparison());
412455
case TPredicate::kBoolExpression:
413456
return FormatExpression(predicate.bool_expression().value());
457+
case TPredicate::kIn:
458+
return FormatIn(predicate.in());
414459
default:
415460
throw yexception() << "UnimplementedPredicateType, payload_case " << static_cast<ui64>(predicate.payload_case());
416461
}

ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
{"Index": 2, "Name": "Columns", "Type": "TExprBase"},
7373
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"},
7474
{"Index": 4, "Name": "Token", "Type": "TCoSecureParam"},
75-
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
75+
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoAtom"}
7676
]
7777
},
7878
{

ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,6 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
156156
return TStatus::Error;
157157
}
158158

159-
auto rowSchema = topic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
160-
161-
const TStatus filterAnnotationStatus = NYql::NPushdown::AnnotateFilterPredicate(input.Ptr(), TDqPqTopicSource::idx_FilterPredicate, rowSchema, ctx);
162-
if (filterAnnotationStatus != TStatus::Ok) {
163-
return filterAnnotationStatus;
164-
}
165159

166160
if (topic.Metadata().Empty()) {
167161
input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
139139
auto row = Build<TCoArgument>(ctx, read->Pos())
140140
.Name("row")
141141
.Done();
142-
auto emptyPredicate = Build<TCoLambda>(ctx, read->Pos())
143-
.Args({row})
144-
.Body<TCoBool>()
145-
.Literal().Build("true")
146-
.Build()
147-
.Done().Ptr();
142+
TString emptyPredicate;
148143

149144
return Build<TDqSourceWrap>(ctx, read->Pos())
150145
.Input<TDqPqTopicSource>()
@@ -155,7 +150,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
155150
.Token<TCoSecureParam>()
156151
.Name().Build(token)
157152
.Build()
158-
.FilterPredicate(emptyPredicate)
153+
.FilterPredicate().Value(emptyPredicate).Build()
159154
.Build()
160155
.RowType(ExpandType(pqReadTopic.Pos(), *rowType, ctx))
161156
.DataSource(pqReadTopic.DataSource().Cast<TCoDataSource>())
@@ -262,12 +257,9 @@ class TPqDqIntegration: public TDqIntegrationBase {
262257
}
263258

264259
NYql::NConnector::NApi::TPredicate predicateProto;
265-
if (auto predicate = topicSource.FilterPredicate(); !NYql::IsEmptyFilterPredicate(predicate)) {
266-
TStringBuilder err;
267-
if (!NYql::SerializeFilterPredicate(predicate, &predicateProto, err)) {
268-
ctx.AddWarning(TIssue(ctx.GetPosition(node.Pos()), "Failed to serialize filter predicate for source: " + err));
269-
predicateProto.Clear();
270-
}
260+
auto serializedProto = topicSource.FilterPredicate().Ref().Content();
261+
if (!predicateProto.ParseFromString(serializedProto)) {
262+
YQL_CLOG(ERROR, ProviderPq) << "ParseFromString failed";
271263
}
272264

273265
sharedReading = sharedReading && (format == "json_each_row" || format == "raw");

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
#include <ydb/library/yql/providers/common/pushdown/physical_opt.h>
1818
#include <ydb/library/yql/providers/common/pushdown/predicate_node.h>
1919

20+
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
21+
#include <ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h>
22+
2023
namespace NYql {
2124

2225
using namespace NNodes;
@@ -27,7 +30,7 @@ namespace {
2730
: NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
2831
{
2932
using EFlag = NPushdown::TSettings::EFeatureFlag;
30-
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator);
33+
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::InOperator);
3134
}
3235
};
3336

@@ -250,7 +253,7 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
250253
return node;
251254
}
252255
TDqPqTopicSource dqPqTopicSource = maybeDqPqTopicSource.Cast();
253-
if (!IsEmptyFilterPredicate(dqPqTopicSource.FilterPredicate())) {
256+
if (!dqPqTopicSource.FilterPredicate().Ref().Content().empty()) {
254257
YQL_CLOG(TRACE, ProviderPq) << "Push filter. Lambda is already not empty";
255258
return node;
256259
}
@@ -259,6 +262,22 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
259262
if (!newFilterLambda) {
260263
return node;
261264
}
265+
266+
NYql::NConnector::NApi::TPredicate predicateProto;
267+
auto predicate = newFilterLambda.Cast();
268+
269+
if (!NYql::IsEmptyFilterPredicate(predicate)) {
270+
TStringBuilder err;
271+
if (!NYql::SerializeFilterPredicate(predicate, &predicateProto, err)) {
272+
ctx.AddWarning(TIssue(ctx.GetPosition(node.Pos()), "Failed to serialize filter predicate for source: " + err));
273+
predicateProto.Clear();
274+
}
275+
}
276+
277+
TString serializedProto;
278+
if (!predicateProto.SerializeToString(&serializedProto)) {
279+
return node;
280+
}
262281
YQL_CLOG(INFO, ProviderPq) << "Build new TCoFlatMap with predicate";
263282

264283
if (maybeExtractMembers) {
@@ -270,7 +289,7 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
270289
.InitFrom(dqSourceWrap)
271290
.Input<TDqPqTopicSource>()
272291
.InitFrom(dqPqTopicSource)
273-
.FilterPredicate(newFilterLambda.Cast())
292+
.FilterPredicate().Value(serializedProto).Build()
274293
.Build()
275294
.Build()
276295
.Build()
@@ -282,7 +301,7 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
282301
.InitFrom(dqSourceWrap)
283302
.Input<TDqPqTopicSource>()
284303
.InitFrom(dqPqTopicSource)
285-
.FilterPredicate(newFilterLambda.Cast())
304+
.FilterPredicate().Value(serializedProto).Build()
286305
.Build()
287306
.Build()
288307
.Done();

ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,11 +271,15 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
271271
static NPushdown::TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree,
272272
TExprContext& ctx, TPositionHandle pos)
273273
{
274+
YQL_CLOG(TRACE, ProviderS3) << "SplitForPartialPushdown " ;
275+
274276
if (predicateTree.CanBePushed) {
277+
YQL_CLOG(TRACE, ProviderS3) << "SplitForPartialPushdown 1" ;
275278
return predicateTree;
276279
}
277280

278281
if (predicateTree.Op != NPushdown::EBoolOp::And) {
282+
YQL_CLOG(TRACE, ProviderS3) << "SplitForPartialPushdown 2" ;
279283
return NPushdown::TPredicateNode(); // Not valid, => return the same node from optimizer
280284
}
281285

@@ -287,6 +291,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
287291
}
288292
NPushdown::TPredicateNode predicateToPush;
289293
predicateToPush.SetPredicates(pushable, ctx, pos);
294+
YQL_CLOG(TRACE, ProviderS3) << "SplitForPartialPushdown end" ;
290295
return predicateToPush;
291296
}
292297

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,20 @@ def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match
7979
time.sleep(1)
8080
pass
8181

82-
8382
class TestPqRowDispatcher(TestYdsBase):
8483

84+
def run_and_check(self, kikimr, client, sql, input, output, expected_predicate):
85+
query_id = start_yds_query(kikimr, client, sql)
86+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
87+
88+
self.write_stream(input)
89+
assert self.read_stream(len(output), topic_path=self.output_topic) == output
90+
91+
stop_yds_query(client, query_id)
92+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
93+
94+
issues = str(client.describe_query(query_id).result.query.transient_issue)
95+
assert expected_predicate in issues, "Incorrect Issues: " + issues
8596
@yq_v1
8697
def test_read_raw_format_with_row_dispatcher(self, kikimr, client):
8798
client.create_yds_connection(
@@ -254,40 +265,27 @@ def test_nested_types(self, kikimr, client):
254265
assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues
255266

256267
@yq_v1
257-
def test_filter(self, kikimr, client):
268+
def test_filters(self, kikimr, client):
258269
client.create_yds_connection(
259270
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
260271
)
261272
self.init_topics("test_filter")
262-
263273
sql = Rf'''
264274
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
265275
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
266-
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String NOT NULL, event String NOT NULL))
267-
WHERE time > 101UL or event = "event666";'''
268-
269-
query_id = start_yds_query(kikimr, client, sql)
270-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
271-
276+
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String NOT NULL, event String NOT NULL)) WHERE '''
272277
data = [
273278
'{"time": 101, "data": "hello1", "event": "event1"}',
274-
'{"time": 102, "data": "hello2", "event": "event2"}',
275-
]
276-
277-
self.write_stream(data)
279+
'{"time": 102, "data": "hello2", "event": "event2"}']
280+
filter = "time > 101UL;"
278281
expected = ['102']
279-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
280-
281-
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
282-
283-
stop_yds_query(client, query_id)
284-
# Assert that all read rules were removed after query stops
285-
read_rules = list_read_rules(self.input_topic)
286-
assert len(read_rules) == 0, read_rules
287-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
288-
289-
issues = str(client.describe_query(query_id).result.query.transient_issue)
290-
assert "Row dispatcher will use the predicate: WHERE (`time` > 101" in issues, "Incorrect Issues: " + issues
282+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `time` > 101')
283+
filter = 'data = "hello2"'
284+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `data` = \\"hello2\\"')
285+
filter = 'event IN ("event2")'
286+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
287+
filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
288+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"1\\"')
291289

292290
@yq_v1
293291
def test_filter_missing_fields(self, kikimr, client):

0 commit comments

Comments
 (0)