Skip to content

Commit 4460f7e

Browse files
authored
Merge 93f0384 into 8a77b3d
2 parents 8a77b3d + 93f0384 commit 4460f7e

File tree

4 files changed

+78
-16
lines changed

4 files changed

+78
-16
lines changed

ydb/library/yql/providers/common/pushdown/collection.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,25 @@ bool SqlInCanBePushed(const TCoSqlIn& sqlIn, const TExprNode* lambdaArg, const T
456456
return true;
457457
}
458458

459+
bool IsDistinctCanBePushed(const TExprBase& predicate, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) {
460+
if (predicate.Ref().ChildrenSize() != 2 ) {
461+
return false;
462+
}
463+
auto expr1 = predicate.Ref().Child(0);
464+
auto expr2 = predicate.Ref().Child(1);
465+
if (!CheckExpressionNodeForPushdown(TExprBase(expr1), lambdaArg, settings)
466+
|| !CheckExpressionNodeForPushdown(TExprBase(expr2), lambdaArg, settings)) {
467+
return false;
468+
}
469+
const TTypeAnnotationNode* inputType = lambdaBody.Ptr()->GetTypeAnn();
470+
if (!settings.IsEnabled(TSettings::EFeatureFlag::DoNotCheckCompareArgumentsTypes)) {
471+
if (!IsComparableTypes(TExprBase(expr1), TExprBase(expr2), false, inputType, settings)) {
472+
return false;
473+
}
474+
}
475+
return true;
476+
}
477+
459478
bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg, const TSettings& settings) {
460479
/*
461480
* There are three ways of comparison in following format:
@@ -595,9 +614,13 @@ void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree
595614
} else if (settings.IsEnabled(TSettings::EFeatureFlag::InOperator) && predicate.Maybe<TCoSqlIn>()) {
596615
auto sqlIn = predicate.Cast<TCoSqlIn>();
597616
predicateTree.CanBePushed = SqlInCanBePushed(sqlIn, lambdaArg, lambdaBody, settings);
617+
} else if (settings.IsEnabled(TSettings::EFeatureFlag::IsDistinctOperator) &&
618+
(predicate.Ref().IsCallable("IsNotDistinctFrom") || predicate.Ref().IsCallable("IsDistinctFrom"))) {
619+
predicateTree.CanBePushed = IsDistinctCanBePushed(predicate, lambdaArg, lambdaBody, settings);;
598620
} else {
599621
predicateTree.CanBePushed = false;
600622
}
623+
601624
}
602625

603626
} // namespace NYql::NPushdown

ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,17 @@ namespace NYql {
168168
return true;
169169
}
170170

171+
bool SerializeIsNotDistinctFrom(const TExprBase& predicate, TPredicate* predicateProto, const TCoArgument& arg, TStringBuilder& err, bool invert) {
172+
if (predicate.Ref().ChildrenSize() != 2 ) {
173+
err << "unknown predicate, children size " << predicate.Ref().ChildrenSize();
174+
return false;
175+
}
176+
TPredicate::TComparison* proto = predicateProto->mutable_comparison();
177+
proto->set_operation(!invert ? TPredicate::TComparison::IND : TPredicate::TComparison::ID);
178+
return SerializeExpression(TExprBase(predicate.Ref().Child(0)), proto->mutable_left_value(), arg, err)
179+
&& SerializeExpression(TExprBase(predicate.Ref().Child(1)), proto->mutable_right_value(), arg, err);
180+
}
181+
171182
bool SerializeAnd(const TCoAnd& andExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
172183
auto* dstProto = proto->mutable_conjunction();
173184
for (const auto& child : andExpr.Ptr()->Children()) {
@@ -204,27 +215,24 @@ namespace NYql {
204215
bool SerializePredicate(const TExprBase& predicate, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
205216
if (auto compare = predicate.Maybe<TCoCompare>()) {
206217
return SerializeCompare(compare.Cast(), proto, arg, err);
207-
}
208-
if (auto coalesce = predicate.Maybe<TCoCoalesce>()) {
218+
} else if (auto coalesce = predicate.Maybe<TCoCoalesce>()) {
209219
return SerializeCoalesce(coalesce.Cast(), proto, arg, err);
210-
}
211-
if (auto andExpr = predicate.Maybe<TCoAnd>()) {
220+
} else if (auto andExpr = predicate.Maybe<TCoAnd>()) {
212221
return SerializeAnd(andExpr.Cast(), proto, arg, err);
213-
}
214-
if (auto orExpr = predicate.Maybe<TCoOr>()) {
222+
} else if (auto orExpr = predicate.Maybe<TCoOr>()) {
215223
return SerializeOr(orExpr.Cast(), proto, arg, err);
216-
}
217-
if (auto notExpr = predicate.Maybe<TCoNot>()) {
224+
} else if (auto notExpr = predicate.Maybe<TCoNot>()) {
218225
return SerializeNot(notExpr.Cast(), proto, arg, err);
219-
}
220-
if (auto member = predicate.Maybe<TCoMember>()) {
226+
} else if (auto member = predicate.Maybe<TCoMember>()) {
221227
return SerializeMember(member.Cast(), proto, arg, err);
222-
}
223-
if (auto exists = predicate.Maybe<TCoExists>()) {
228+
} else if (auto exists = predicate.Maybe<TCoExists>()) {
224229
return SerializeExists(exists.Cast(), proto, arg, err);
225-
}
226-
if (auto sqlIn = predicate.Maybe<TCoSqlIn>()) {
230+
} else if (auto sqlIn = predicate.Maybe<TCoSqlIn>()) {
227231
return SerializeSqlIn(sqlIn.Cast(), proto, arg, err);
232+
} else if (predicate.Ref().IsCallable("IsNotDistinctFrom")) {
233+
return SerializeIsNotDistinctFrom(predicate, proto, arg, err, false);
234+
} else if (predicate.Ref().IsCallable("IsDistinctFrom")) {
235+
return SerializeIsNotDistinctFrom(predicate, proto, arg, err, true);
228236
}
229237

230238
err << "unknown predicate: " << predicate.Raw()->Content();

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace {
3030
: NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
3131
{
3232
using EFlag = NPushdown::TSettings::EFeatureFlag;
33-
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator);
33+
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator | EFlag::IsDistinctOperator);
3434
}
3535
};
3636

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def test_nested_types_without_predicate(self, kikimr, client):
299299
stop_yds_query(client, query_id)
300300

301301
@yq_v1
302-
def test_filters(self, kikimr, client):
302+
def test_filters_non_optional_field(self, kikimr, client):
303303
client.create_yds_connection(
304304
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
305305
)
@@ -321,11 +321,42 @@ def test_filters(self, kikimr, client):
321321
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS NOT DISTINCT FROM \\"event2\\"')
322322
filter = ' event IS DISTINCT FROM "event1"'
323323
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"')
324+
filter = ' event IS DISTINCT FROM "event1"'
325+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"')
324326
filter = 'event IN ("event2")'
325327
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
326328
filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
327329
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"1\\"')
328330

331+
@yq_v1
332+
def test_filters_optional_field(self, kikimr, client):
333+
client.create_yds_connection(
334+
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
335+
)
336+
self.init_topics("test_filter")
337+
338+
sql = Rf'''
339+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
340+
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
341+
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String)) WHERE '''
342+
data = [
343+
'{"time": 101, "data": "hello1", "event": "event1"}',
344+
'{"time": 102, "data": "hello2", "event": "event2"}']
345+
expected = ['102']
346+
filter = 'data = "hello2"'
347+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `data` = \\"hello2\\"')
348+
filter = ' event IS NOT DISTINCT FROM "event2"'
349+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS NOT DISTINCT FROM \\"event2\\"')
350+
filter = ' event IS DISTINCT FROM "event1"'
351+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"')
352+
filter = ' event IS DISTINCT FROM "event1"'
353+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"')
354+
filter = 'event IN ("event2")'
355+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
356+
# TODO
357+
#filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
358+
#self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"1\\"')
359+
329360
@yq_v1
330361
def test_filter_missing_fields(self, kikimr, client):
331362
client.create_yds_connection(

0 commit comments

Comments
 (0)