Skip to content

Commit 4b739da

Browse files
authored
Merge 5f0833b into 497f6e4
2 parents 497f6e4 + 5f0833b commit 4b739da

File tree

8 files changed

+133
-51
lines changed

8 files changed

+133
-51
lines changed

ydb/library/yql/providers/common/pushdown/collection.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,38 @@ bool CompareCanBePushed(const TCoCompare& compare, const TExprNode* lambdaArg, c
420420
return true;
421421
}
422422

423+
bool SqlInCanBePushed(const TCoSqlIn& sqlIn, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) {
424+
const TExprBase& expr = sqlIn.Collection();
425+
const TExprBase& lookup = sqlIn.Lookup();
426+
427+
if (!CheckExpressionNodeForPushdown(lookup, lambdaArg, settings)) {
428+
return false;
429+
}
430+
431+
TExprNode::TPtr collection;
432+
if (expr.Ref().IsList()) {
433+
collection = expr.Ptr();
434+
} else if (auto maybeAsList = expr.Maybe<TCoAsList>()) {
435+
collection = maybeAsList.Cast().Ptr();
436+
} else {
437+
return false;
438+
}
439+
440+
const TTypeAnnotationNode* inputType = lambdaBody.Ptr()->GetTypeAnn();
441+
for (auto& child : collection->Children()) {
442+
if (!CheckExpressionNodeForPushdown(TExprBase(child), lambdaArg, settings)) {
443+
return false;
444+
}
445+
446+
if (!settings.IsEnabled(TSettings::EFeatureFlag::DoNotCheckCompareArgumentsTypes)) {
447+
if (!IsComparableTypes(lookup, TExprBase(child), false, inputType, settings)) {
448+
return false;
449+
}
450+
}
451+
}
452+
return true;
453+
}
454+
423455
bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg, const TSettings& settings) {
424456
/*
425457
* There are three ways of comparison in following format:
@@ -556,6 +588,9 @@ void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree
556588
CollectExpressionPredicate(predicateTree, predicate.Cast<TCoMember>(), lambdaArg);
557589
} else if (settings.IsEnabled(TSettings::EFeatureFlag::JustPassthroughOperators) && (predicate.Maybe<TCoIf>() || predicate.Maybe<TCoJust>())) {
558590
CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody, settings);
591+
} else if (settings.IsEnabled(TSettings::EFeatureFlag::InOperator) && predicate.Maybe<TCoSqlIn>()) {
592+
auto sqlIn = predicate.Cast<TCoSqlIn>();
593+
predicateTree.CanBePushed = SqlInCanBePushed(sqlIn, lambdaArg, lambdaBody, settings);
559594
} else {
560595
predicateTree.CanBePushed = false;
561596
}

ydb/library/yql/providers/common/pushdown/settings.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ struct TSettings {
2525
UnaryOperators = 1 << 15, // -, Abs, Size
2626
DoNotCheckCompareArgumentsTypes = 1 << 16,
2727
TimestampCtor = 1 << 17,
28-
JustPassthroughOperators = 1 << 18 // if + coalesce + just
28+
JustPassthroughOperators = 1 << 18, // if + coalesce + just
29+
InOperator = 1 << 19 // IN()
2930
};
3031

3132
explicit TSettings(NLog::EComponent logComponent)

ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace NYql {
2121
TString FormatIsNull(const TPredicate_TIsNull& isNull);
2222
TString FormatIsNotNull(const TPredicate_TIsNotNull& isNotNull);
2323
TString FormatPredicate(const TPredicate& predicate, bool topLevel);
24+
TString FormatIn(const TPredicate_TIn& in);
2425

2526
namespace {
2627

@@ -139,6 +140,32 @@ namespace NYql {
139140
return SerializeExpression(exists.Optional(), expressionProto, arg, err);
140141
}
141142

143+
bool SerializeSqlIn(const TCoSqlIn& sqlIn, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
144+
auto* dstProto = proto->mutable_in();
145+
const TExprBase& expr = sqlIn.Collection();
146+
const TExprBase& lookup = sqlIn.Lookup();
147+
148+
auto* expressionProto = dstProto->mutable_value();
149+
SerializeExpression(lookup, expressionProto, arg, err);
150+
151+
TExprNode::TPtr collection;
152+
if (expr.Ref().IsList()) {
153+
collection = expr.Ptr();
154+
} else if (auto maybeAsList = expr.Maybe<TCoAsList>()) {
155+
collection = maybeAsList.Cast().Ptr();
156+
} else {
157+
err << "unknown operation: " << expr.Ref().Content();
158+
return false;
159+
}
160+
161+
for (auto& child : collection->Children()) {
162+
if (!SerializeExpression(TExprBase(child), dstProto->add_set(), arg, err)) {
163+
return false;
164+
}
165+
}
166+
return true;
167+
}
168+
142169
bool SerializeAnd(const TCoAnd& andExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
143170
auto* dstProto = proto->mutable_conjunction();
144171
for (const auto& child : andExpr.Ptr()->Children()) {
@@ -194,6 +221,9 @@ namespace NYql {
194221
if (auto exists = predicate.Maybe<TCoExists>()) {
195222
return SerializeExists(exists.Cast(), proto, arg, err);
196223
}
224+
if (auto sqlIn = predicate.Maybe<TCoSqlIn>()) {
225+
return SerializeSqlIn(sqlIn.Cast(), proto, arg, err);
226+
}
197227

198228
err << "unknown predicate: " << predicate.Raw()->Content();
199229
return false;
@@ -393,6 +423,18 @@ namespace NYql {
393423
return left + operation + right;
394424
}
395425

426+
TString FormatIn(const TPredicate_TIn& in) {
427+
auto value = FormatExpression(in.value());
428+
TString list;
429+
for (const auto& expr : in.set()) {
430+
if (!list.empty()) {
431+
list += ",";
432+
}
433+
list += FormatExpression(expr);
434+
}
435+
return value + " IN (" + list + ")";
436+
}
437+
396438
TString FormatPredicate(const TPredicate& predicate, bool topLevel ) {
397439
switch (predicate.payload_case()) {
398440
case TPredicate::PAYLOAD_NOT_SET:
@@ -411,6 +453,8 @@ namespace NYql {
411453
return FormatComparison(predicate.comparison());
412454
case TPredicate::kBoolExpression:
413455
return FormatExpression(predicate.bool_expression().value());
456+
case TPredicate::kIn:
457+
return FormatIn(predicate.in());
414458
default:
415459
throw yexception() << "UnimplementedPredicateType, payload_case " << static_cast<ui64>(predicate.payload_case());
416460
}

ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
{"Index": 2, "Name": "Columns", "Type": "TExprBase"},
7373
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"},
7474
{"Index": 4, "Name": "Token", "Type": "TCoSecureParam"},
75-
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
75+
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoAtom"}
7676
]
7777
},
7878
{

ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,6 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
156156
return TStatus::Error;
157157
}
158158

159-
auto rowSchema = topic.RowSpec().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
160-
161-
const TStatus filterAnnotationStatus = NYql::NPushdown::AnnotateFilterPredicate(input.Ptr(), TDqPqTopicSource::idx_FilterPredicate, rowSchema, ctx);
162-
if (filterAnnotationStatus != TStatus::Ok) {
163-
return filterAnnotationStatus;
164-
}
165159

166160
if (topic.Metadata().Empty()) {
167161
input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
139139
auto row = Build<TCoArgument>(ctx, read->Pos())
140140
.Name("row")
141141
.Done();
142-
auto emptyPredicate = Build<TCoLambda>(ctx, read->Pos())
143-
.Args({row})
144-
.Body<TCoBool>()
145-
.Literal().Build("true")
146-
.Build()
147-
.Done().Ptr();
142+
TString emptyPredicate;
148143

149144
return Build<TDqSourceWrap>(ctx, read->Pos())
150145
.Input<TDqPqTopicSource>()
@@ -155,7 +150,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
155150
.Token<TCoSecureParam>()
156151
.Name().Build(token)
157152
.Build()
158-
.FilterPredicate(emptyPredicate)
153+
.FilterPredicate().Value(emptyPredicate).Build()
159154
.Build()
160155
.RowType(ExpandType(pqReadTopic.Pos(), *rowType, ctx))
161156
.DataSource(pqReadTopic.DataSource().Cast<TCoDataSource>())
@@ -264,13 +259,8 @@ class TPqDqIntegration: public TDqIntegrationBase {
264259
}
265260

266261
NYql::NConnector::NApi::TPredicate predicateProto;
267-
if (auto predicate = topicSource.FilterPredicate(); !NYql::IsEmptyFilterPredicate(predicate)) {
268-
TStringBuilder err;
269-
if (!NYql::SerializeFilterPredicate(predicate, &predicateProto, err)) {
270-
ctx.AddWarning(TIssue(ctx.GetPosition(node.Pos()), "Failed to serialize filter predicate for source: " + err));
271-
predicateProto.Clear();
272-
}
273-
}
262+
auto serializedProto = topicSource.FilterPredicate().Ref().Content();
263+
YQL_ENSURE (predicateProto.ParseFromString(serializedProto));
274264

275265
sharedReading = sharedReading && (format == "json_each_row" || format == "raw");
276266
TString predicateSql = NYql::FormatWhere(predicateProto);

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
#include <ydb/library/yql/providers/common/pushdown/physical_opt.h>
1818
#include <ydb/library/yql/providers/common/pushdown/predicate_node.h>
1919

20+
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
21+
#include <ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h>
22+
2023
namespace NYql {
2124

2225
using namespace NNodes;
@@ -27,7 +30,7 @@ namespace {
2730
: NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
2831
{
2932
using EFlag = NPushdown::TSettings::EFeatureFlag;
30-
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator);
33+
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::InOperator);
3134
}
3235
};
3336

@@ -250,15 +253,30 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
250253
return node;
251254
}
252255
TDqPqTopicSource dqPqTopicSource = maybeDqPqTopicSource.Cast();
253-
if (!IsEmptyFilterPredicate(dqPqTopicSource.FilterPredicate())) {
256+
if (!dqPqTopicSource.FilterPredicate().Ref().Content().empty()) {
254257
YQL_CLOG(TRACE, ProviderPq) << "Push filter. Lambda is already not empty";
255258
return node;
256259
}
257-
260+
258261
auto newFilterLambda = MakePushdownPredicate(flatmap.Lambda(), ctx, node.Pos(), TPushdownSettings());
259262
if (!newFilterLambda) {
260263
return node;
261264
}
265+
266+
auto predicate = newFilterLambda.Cast();
267+
if (NYql::IsEmptyFilterPredicate(predicate)) {
268+
return node;
269+
}
270+
271+
TStringBuilder err;
272+
NYql::NConnector::NApi::TPredicate predicateProto;
273+
if (!NYql::SerializeFilterPredicate(predicate, &predicateProto, err)) {
274+
ctx.AddWarning(TIssue(ctx.GetPosition(node.Pos()), "Failed to serialize filter predicate for source: " + err));
275+
return node;
276+
}
277+
278+
TString serializedProto;
279+
YQL_ENSURE(predicateProto.SerializeToString(&serializedProto));
262280
YQL_CLOG(INFO, ProviderPq) << "Build new TCoFlatMap with predicate";
263281

264282
if (maybeExtractMembers) {
@@ -270,7 +288,7 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
270288
.InitFrom(dqSourceWrap)
271289
.Input<TDqPqTopicSource>()
272290
.InitFrom(dqPqTopicSource)
273-
.FilterPredicate(newFilterLambda.Cast())
291+
.FilterPredicate().Value(serializedProto).Build()
274292
.Build()
275293
.Build()
276294
.Build()
@@ -282,7 +300,7 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
282300
.InitFrom(dqSourceWrap)
283301
.Input<TDqPqTopicSource>()
284302
.InitFrom(dqPqTopicSource)
285-
.FilterPredicate(newFilterLambda.Cast())
303+
.FilterPredicate().Value(serializedProto).Build()
286304
.Build()
287305
.Build()
288306
.Done();

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,19 @@ def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match
8282

8383
class TestPqRowDispatcher(TestYdsBase):
8484

85+
def run_and_check(self, kikimr, client, sql, input, output, expected_predicate):
86+
query_id = start_yds_query(kikimr, client, sql)
87+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
88+
89+
self.write_stream(input)
90+
assert self.read_stream(len(output), topic_path=self.output_topic) == output
91+
92+
stop_yds_query(client, query_id)
93+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
94+
95+
issues = str(client.describe_query(query_id).result.query.transient_issue)
96+
assert expected_predicate in issues, "Incorrect Issues: " + issues
97+
8598
@yq_v1
8699
def test_read_raw_format_with_row_dispatcher(self, kikimr, client):
87100
client.create_yds_connection(
@@ -284,40 +297,27 @@ def test_nested_types_without_predicate(self, kikimr, client):
284297
stop_yds_query(client, query_id)
285298

286299
@yq_v1
287-
def test_filter(self, kikimr, client):
300+
def test_filters(self, kikimr, client):
288301
client.create_yds_connection(
289302
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
290303
)
291304
self.init_topics("test_filter")
292-
293305
sql = Rf'''
294306
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
295307
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
296-
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String NOT NULL, event String NOT NULL))
297-
WHERE time > 101UL or event = "event666";'''
298-
299-
query_id = start_yds_query(kikimr, client, sql)
300-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
301-
308+
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String NOT NULL, event String NOT NULL)) WHERE '''
302309
data = [
303310
'{"time": 101, "data": "hello1", "event": "event1"}',
304-
'{"time": 102, "data": "hello2", "event": "event2"}',
305-
]
306-
307-
self.write_stream(data)
311+
'{"time": 102, "data": "hello2", "event": "event2"}']
312+
filter = "time > 101UL;"
308313
expected = ['102']
309-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
310-
311-
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
312-
313-
stop_yds_query(client, query_id)
314-
# Assert that all read rules were removed after query stops
315-
read_rules = list_read_rules(self.input_topic)
316-
assert len(read_rules) == 0, read_rules
317-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
318-
319-
issues = str(client.describe_query(query_id).result.query.transient_issue)
320-
assert "Row dispatcher will use the predicate: WHERE (`time` > 101" in issues, "Incorrect Issues: " + issues
314+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `time` > 101')
315+
filter = 'data = "hello2"'
316+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `data` = \\"hello2\\"')
317+
filter = 'event IN ("event2")'
318+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
319+
filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
320+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"1\\"')
321321

322322
@yq_v1
323323
def test_filter_missing_fields(self, kikimr, client):

0 commit comments

Comments
 (0)