Skip to content

Commit 7b8bfbd

Browse files
committed
[YQL-17474] Expand PartitionByKeys with constant keys via YtMap. Do not fuse ops with JobCount setting
1 parent 404ef88 commit 7b8bfbd

File tree

10 files changed

+290
-17
lines changed

10 files changed

+290
-17
lines changed

ydb/library/yql/cfg/tests/gateways-experimental.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ Dq {
4040
YqlCore {
4141
Flags {
4242
Name: "OptimizerFlags"
43-
Args: ["FieldSubsetEnableMultiusage"]
43+
Args: ["FieldSubsetEnableMultiusage","PartitionByConstantKeysViaMap"]
4444
}
4545
}

ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2314,6 +2314,8 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
23142314
if ((outerMap.World().Ref().IsWorld() || outerMap.World().Raw() == op.World().Raw())
23152315
&& outerMap.Input().Size() == 1 && outerMap.DataSink().Cluster().Value() == op.DataSink().Cluster().Value()
23162316
&& NYql::HasSetting(op.Settings().Ref(), EYtSettingType::Flow) == NYql::HasSetting(outerMap.Settings().Ref(), EYtSettingType::Flow)
2317+
&& !NYql::HasSetting(op.Settings().Ref(), EYtSettingType::JobCount)
2318+
&& !NYql::HasSetting(outerMap.Settings().Ref(), EYtSettingType::JobCount)
23172319
&& !HasYtRowNumber(outerMap.Mapper().Body().Ref())
23182320
&& IsYieldTransparent(outerMap.Mapper().Ptr(), *State_->Types)
23192321
&& (!op.Maybe<TYtMapReduce>() || AllOf(outerMap.Output(), [](const auto& out) { return !TYtTableBaseInfo::GetRowSpec(out)->IsSorted(); }))) {

ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp

Lines changed: 84 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1874,12 +1874,24 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
18741874
}
18751875
}
18761876

1877+
const bool canUseMapInsteadOfReduce = keySelectorLambda.Body().Ref().IsComplete() &&
1878+
partByKey.SortDirections().Maybe<TCoVoid>() &&
1879+
State_->Types->OptimizerFlags.contains(to_lower(ToString("PartitionByConstantKeysViaMap")));
1880+
1881+
if (canUseMapInsteadOfReduce) {
1882+
YQL_ENSURE(!canUseReduce);
1883+
YQL_ENSURE(sortByColumns.empty());
1884+
useSystemColumns = false;
1885+
}
1886+
18771887
auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, node.Pos());
1878-
settingsBuilder
1879-
.Add()
1880-
.Name().Value(ToString(EYtSettingType::ReduceBy)).Build()
1881-
.Value(TExprBase(ToColumnPairList(reduceByColumns, node.Pos(), ctx)))
1882-
.Build();
1888+
if (!canUseMapInsteadOfReduce) {
1889+
settingsBuilder
1890+
.Add()
1891+
.Name().Value(ToString(EYtSettingType::ReduceBy)).Build()
1892+
.Value(TExprBase(ToColumnPairList(reduceByColumns, node.Pos(), ctx)))
1893+
.Build();
1894+
}
18831895

18841896
if (!sortByColumns.empty()) {
18851897
settingsBuilder
@@ -2266,7 +2278,14 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
22662278
TExprNode::TPtr keyExtractor;
22672279
TExprNode::TPtr handler;
22682280

2269-
if (useSystemColumns) {
2281+
if (canUseMapInsteadOfReduce) {
2282+
groupSwitch = Build<TCoLambda>(ctx, handlerLambda.Pos())
2283+
.Args({"key", "item"})
2284+
.Body<TCoBool>()
2285+
.Literal().Build("false")
2286+
.Build()
2287+
.Done().Ptr();
2288+
} else if (useSystemColumns) {
22702289
groupSwitch = Build<TCoLambda>(ctx, handlerLambda.Pos())
22712290
.Args({"key", "item"})
22722291
.Body<TCoSqlExtractKey>()
@@ -2325,17 +2344,21 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
23252344
.Build()
23262345
.Done().Ptr();
23272346

2328-
handler = Build<TCoLambda>(ctx, handlerLambda.Pos())
2329-
.Args({"item"})
2330-
.Body<TCoRemovePrefixMembers>()
2331-
.Input("item")
2332-
.Prefixes()
2333-
.Add()
2334-
.Value(YqlSysColumnKeySwitch)
2347+
if (canUseMapInsteadOfReduce) {
2348+
handler = BuildIdentityLambda(handlerLambda.Pos(), ctx).Ptr();
2349+
} else {
2350+
handler = Build<TCoLambda>(ctx, handlerLambda.Pos())
2351+
.Args({"item"})
2352+
.Body<TCoRemovePrefixMembers>()
2353+
.Input("item")
2354+
.Prefixes()
2355+
.Add()
2356+
.Value(YqlSysColumnKeySwitch)
2357+
.Build()
23352358
.Build()
23362359
.Build()
2337-
.Build()
2338-
.Done().Ptr();
2360+
.Done().Ptr();
2361+
}
23392362
}
23402363

23412364
handlerLambda = Build<TCoLambda>(ctx, handlerLambda.Pos())
@@ -2429,14 +2452,33 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
24292452
return WrapOp(reduce, ctx);
24302453
}
24312454

2432-
if (needMap && (builder.NeedMap() || multiInput)) {
2455+
if (needMap && (builder.NeedMap() || multiInput) && !canUseMapInsteadOfReduce) {
24332456
settingsBuilder
24342457
.Add()
24352458
.Name().Value(ToString(EYtSettingType::ReduceFilterBy)).Build()
24362459
.Value(TExprBase(ToAtomList(filterColumns, node.Pos(), ctx)))
24372460
.Build();
24382461
}
24392462

2463+
if (canUseMapInsteadOfReduce && !filterColumns.empty()) {
2464+
reducer = Build<TCoLambda>(ctx, reducer.Pos())
2465+
.Args({"input"})
2466+
.Body<TExprApplier>()
2467+
.Apply(reducer)
2468+
.With<TCoMap>(0)
2469+
.Input("input")
2470+
.Lambda()
2471+
.Args({"item"})
2472+
.Body<TCoSelectMembers>()
2473+
.Input("item")
2474+
.Members(ToAtomList(filterColumns, node.Pos(), ctx))
2475+
.Build()
2476+
.Build()
2477+
.Build()
2478+
.Build()
2479+
.Done();
2480+
}
2481+
24402482
bool unordered = ctx.IsConstraintEnabled<TSortedConstraintNode>();
24412483
TExprBase world = GetWorld(input, {}, ctx);
24422484
if (hasInputSampling) {
@@ -2519,6 +2561,26 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
25192561
}
25202562
}
25212563

2564+
if (canUseMapInsteadOfReduce) {
2565+
settingsBuilder
2566+
.Add()
2567+
.Name().Value(ToString(EYtSettingType::JobCount)).Build()
2568+
.Value(TExprBase(ctx.NewAtom(node.Pos(), "1", TNodeFlags::Default)))
2569+
.Build();
2570+
2571+
auto result = Build<TYtMap>(ctx, node.Pos())
2572+
.World(ApplySyncListToWorld(world.Ptr(), syncList, ctx))
2573+
.DataSink(GetDataSink(input, ctx))
2574+
.Input(ConvertInputTable(input, ctx, TConvertInputOpts().MakeUnordered(unordered)))
2575+
.Output()
2576+
.Add(ConvertOutTables(node.Pos(), outItemType, ctx, State_, &partByKey.Ref().GetConstraintSet()))
2577+
.Build()
2578+
.Settings(settingsBuilder.Done())
2579+
.Mapper(reducer)
2580+
.Done();
2581+
return WrapOp(result, ctx);
2582+
}
2583+
25222584
if (forceMapper && mapper.Maybe<TCoVoid>()) {
25232585
mapper = MakeJobLambda<false>(Build<TCoLambda>(ctx, node.Pos()).Args({"stream"}).Body("stream").Done(), useMapFlow, ctx);
25242586
}
@@ -4555,6 +4617,9 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
45554617
if (NYql::HasSetting(innerMap.Settings().Ref(), EYtSettingType::Flow) != NYql::HasSetting(outerMap.Settings().Ref(), EYtSettingType::Flow)) {
45564618
return node;
45574619
}
4620+
if (NYql::HasAnySetting(outerMap.Settings().Ref(), EYtSettingType::JobCount)) {
4621+
return node;
4622+
}
45584623
if (!path.Ranges().Maybe<TCoVoid>()) {
45594624
return node;
45604625
}
@@ -4690,6 +4755,9 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
46904755
if (NYql::HasAnySetting(inner.Settings().Ref(), EYtSettingType::Limit | EYtSettingType::SortLimitBy | EYtSettingType::JobCount)) {
46914756
return node;
46924757
}
4758+
if (NYql::HasSetting(outerMap.Settings().Ref(), EYtSettingType::JobCount)) {
4759+
return node;
4760+
}
46934761
if (outerMap.Input().Item(0).Settings().Size() != 0) {
46944762
return node;
46954763
}

ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,28 @@
316316
}
317317
],
318318
"test.test[aggregate-aggregate_with_lambda--Results]": [],
319+
"test.test[aggregate-aggrs_no_grouping_via_map_compact-default.txt-Analyze]": [
320+
{
321+
"checksum": "6897d5892e31430b3cfcda4973d48d02",
322+
"size": 5487,
323+
"uri": "https://{canondata_backend}/1814674/4ada1e29e9771da01d2a22d71dc425700ffa6a5f/resource.tar.gz#test.test_aggregate-aggrs_no_grouping_via_map_compact-default.txt-Analyze_/plan.txt"
324+
}
325+
],
326+
"test.test[aggregate-aggrs_no_grouping_via_map_compact-default.txt-Debug]": [
327+
{
328+
"checksum": "90af41f20b34db49b041bdc637803192",
329+
"size": 15008,
330+
"uri": "https://{canondata_backend}/1814674/4ada1e29e9771da01d2a22d71dc425700ffa6a5f/resource.tar.gz#test.test_aggregate-aggrs_no_grouping_via_map_compact-default.txt-Debug_/opt.yql_patched"
331+
}
332+
],
333+
"test.test[aggregate-aggrs_no_grouping_via_map_compact-default.txt-Plan]": [
334+
{
335+
"checksum": "6897d5892e31430b3cfcda4973d48d02",
336+
"size": 5487,
337+
"uri": "https://{canondata_backend}/1814674/4ada1e29e9771da01d2a22d71dc425700ffa6a5f/resource.tar.gz#test.test_aggregate-aggrs_no_grouping_via_map_compact-default.txt-Plan_/plan.txt"
338+
}
339+
],
340+
"test.test[aggregate-aggrs_no_grouping_via_map_compact-default.txt-Results]": [],
319341
"test.test[aggregate-group_by_expr_order_by_expr--Analyze]": [
320342
{
321343
"checksum": "68758cc21a759dfeca1624654de64bdd",

ydb/library/yql/tests/sql/dq_file/part14/canondata/result.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,28 @@
253253
}
254254
],
255255
"test.test[aggregate-agg_phases_table3-default.txt-Results]": [],
256+
"test.test[aggregate-aggrs_no_grouping_via_map-default.txt-Analyze]": [
257+
{
258+
"checksum": "25d401612e96668b6aad91e774a1aa02",
259+
"size": 5485,
260+
"uri": "https://{canondata_backend}/1925842/196fcb8368788ce91476288ce484bf98cef33817/resource.tar.gz#test.test_aggregate-aggrs_no_grouping_via_map-default.txt-Analyze_/plan.txt"
261+
}
262+
],
263+
"test.test[aggregate-aggrs_no_grouping_via_map-default.txt-Debug]": [
264+
{
265+
"checksum": "ca5825872a1ed2d8680a159fd3a3600b",
266+
"size": 22039,
267+
"uri": "https://{canondata_backend}/1925842/196fcb8368788ce91476288ce484bf98cef33817/resource.tar.gz#test.test_aggregate-aggrs_no_grouping_via_map-default.txt-Debug_/opt.yql_patched"
268+
}
269+
],
270+
"test.test[aggregate-aggrs_no_grouping_via_map-default.txt-Plan]": [
271+
{
272+
"checksum": "25d401612e96668b6aad91e774a1aa02",
273+
"size": 5485,
274+
"uri": "https://{canondata_backend}/1925842/196fcb8368788ce91476288ce484bf98cef33817/resource.tar.gz#test.test_aggregate-aggrs_no_grouping_via_map-default.txt-Plan_/plan.txt"
275+
}
276+
],
277+
"test.test[aggregate-aggrs_no_grouping_via_map-default.txt-Results]": [],
256278
"test.test[aggregate-group_by_cube_expr_trio--Analyze]": [
257279
{
258280
"checksum": "23f6ef9781306d246a286e3afc242460",

ydb/library/yql/tests/sql/sql2yql/canondata/result.json

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,6 +1721,20 @@
17211721
"uri": "https://{canondata_backend}/1871182/6b10ad6d9884e5faf3a77187ffb9b38b59b46458/resource.tar.gz#test_sql2yql.test_aggregate-aggrs_no_grouping_/sql.yql"
17221722
}
17231723
],
1724+
"test_sql2yql.test[aggregate-aggrs_no_grouping_via_map]": [
1725+
{
1726+
"checksum": "134146a9e0a1584923e86944ecdbc6a5",
1727+
"size": 13578,
1728+
"uri": "https://{canondata_backend}/1925842/a6c648de784005475debfb02e98abf5e2f60bc0c/resource.tar.gz#test_sql2yql.test_aggregate-aggrs_no_grouping_via_map_/sql.yql"
1729+
}
1730+
],
1731+
"test_sql2yql.test[aggregate-aggrs_no_grouping_via_map_compact]": [
1732+
{
1733+
"checksum": "a152c0af8697ec762979c2c9b218a0fd",
1734+
"size": 15568,
1735+
"uri": "https://{canondata_backend}/1925842/a6c648de784005475debfb02e98abf5e2f60bc0c/resource.tar.gz#test_sql2yql.test_aggregate-aggrs_no_grouping_via_map_compact_/sql.yql"
1736+
}
1737+
],
17241738
"test_sql2yql.test[aggregate-avg_and_sum]": [
17251739
{
17261740
"checksum": "79739198f6ebccd1e0310d5b7422d4e6",
@@ -19067,6 +19081,20 @@
1906719081
"uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_aggregate-aggrs_no_grouping_/formatted.sql"
1906819082
}
1906919083
],
19084+
"test_sql_format.test[aggregate-aggrs_no_grouping_via_map]": [
19085+
{
19086+
"checksum": "2b33658e05326c2054c44b12bba4ba4b",
19087+
"size": 1546,
19088+
"uri": "https://{canondata_backend}/1925842/a6c648de784005475debfb02e98abf5e2f60bc0c/resource.tar.gz#test_sql_format.test_aggregate-aggrs_no_grouping_via_map_/formatted.sql"
19089+
}
19090+
],
19091+
"test_sql_format.test[aggregate-aggrs_no_grouping_via_map_compact]": [
19092+
{
19093+
"checksum": "26f3b35e1a5b16dbf8e5e0b3434cc926",
19094+
"size": 1570,
19095+
"uri": "https://{canondata_backend}/1925842/a6c648de784005475debfb02e98abf5e2f60bc0c/resource.tar.gz#test_sql_format.test_aggregate-aggrs_no_grouping_via_map_compact_/formatted.sql"
19096+
}
19097+
],
1907019098
"test_sql_format.test[aggregate-avg_and_sum]": [
1907119099
{
1907219100
"checksum": "067d46d5653bcb904d02b77caff9927a",
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/* syntax version 1 */
2+
/* postgres can not */
3+
pragma config.flags("OptimizerFlags", "PartitionByConstantKeysViaMap");
4+
select
5+
count(key) as keyCount,
6+
count(sub) as subCount,
7+
count(val) as valCount,
8+
countIf(sub % 2 == 0) as evenCount,
9+
countIf(sub % 2 == 1) as oddCount,
10+
every(sub % 2 == 0) as every,
11+
boolOr(sub % 2 == 0) as boolOr,
12+
avg(key) as keyAvg,
13+
avg(sub) as subAvg,
14+
min(key) as keyMin,
15+
min(sub) as subMin,
16+
min(val) as valMin,
17+
max(key) as keyMax,
18+
max(sub) as subMax,
19+
max(val) as valMax,
20+
some(key) as keySome,
21+
some(sub) as subSome,
22+
some(val) as valSome,
23+
bitAnd(cast(key AS Uint64)) as keyBitAnd,
24+
bitOr(cast(key AS Uint64)) as keyBitOr,
25+
bitXor(cast(key AS Uint64)) as keyBitXor,
26+
bitAnd(cast(sub AS Uint64)) as subBitAnd,
27+
bitOr(cast(sub AS Uint64)) as subBitOr,
28+
bitXor(cast(sub AS Uint64)) as subBitXor,
29+
median(key) as keyMedian,
30+
median(sub) as subMedian,
31+
stdDev(key) as keyStdDev,
32+
stdDev(sub) as subStdDev,
33+
stdDev(empty) as emptyStdDev,
34+
variance(key) as keyVariance,
35+
variance(sub) as subVariance,
36+
stdDevPop(key) as keyPopStdDev,
37+
stdDevPop(sub) as subPopStdDev,
38+
varPop(key) as keyPopVariance,
39+
varPop(sub) as subPopVariance,
40+
correlation(key, sub) AS corr,
41+
covariance(key, sub) AS covar,
42+
covarpop(key, sub) AS covarpop
43+
from
44+
(select cast(key as int) as key, Unwrap(cast(subkey as int)) as sub, value as val, cast(value AS int) AS empty from plato.Input);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/* syntax version 1 */
2+
/* postgres can not */
3+
pragma config.flags("OptimizerFlags", "PartitionByConstantKeysViaMap");
4+
select
5+
count(key) as keyCount,
6+
count(sub) as subCount,
7+
count(val) as valCount,
8+
countIf(sub % 2 == 0) as evenCount,
9+
countIf(sub % 2 == 1) as oddCount,
10+
every(sub % 2 == 0) as every,
11+
boolOr(sub % 2 == 0) as boolOr,
12+
avg(key) as keyAvg,
13+
avg(sub) as subAvg,
14+
min(key) as keyMin,
15+
min(sub) as subMin,
16+
min(val) as valMin,
17+
max(key) as keyMax,
18+
max(sub) as subMax,
19+
max(val) as valMax,
20+
some(key) as keySome,
21+
some(sub) as subSome,
22+
some(val) as valSome,
23+
bitAnd(cast(key AS Uint64)) as keyBitAnd,
24+
bitOr(cast(key AS Uint64)) as keyBitOr,
25+
bitXor(cast(key AS Uint64)) as keyBitXor,
26+
bitAnd(cast(sub AS Uint64)) as subBitAnd,
27+
bitOr(cast(sub AS Uint64)) as subBitOr,
28+
bitXor(cast(sub AS Uint64)) as subBitXor,
29+
median(key) as keyMedian,
30+
median(sub) as subMedian,
31+
stdDev(key) as keyStdDev,
32+
stdDev(sub) as subStdDev,
33+
stdDev(empty) as emptyStdDev,
34+
variance(key) as keyVariance,
35+
variance(sub) as subVariance,
36+
stdDevPop(key) as keyPopStdDev,
37+
stdDevPop(sub) as subPopStdDev,
38+
varPop(key) as keyPopVariance,
39+
varPop(sub) as subPopVariance,
40+
correlation(key, sub) AS corr,
41+
covariance(key, sub) AS covar,
42+
covarpop(key, sub) AS covarpop
43+
from
44+
(select cast(key as int) as key, Unwrap(cast(subkey as int)) as sub, value as val, cast(value AS int) AS empty from plato.Input)
45+
group compact by ();

ydb/library/yql/tests/sql/yt_native_file/part0/canondata/result.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,27 @@
293293
"uri": "https://{canondata_backend}/1031349/a2d4ff3f821af2abc956289d3861b20fa2f7f1bd/resource.tar.gz#test.test_aggregate-aggregate_with_lambda--Results_/results.txt"
294294
}
295295
],
296+
"test.test[aggregate-aggrs_no_grouping_via_map_compact-default.txt-Debug]": [
297+
{
298+
"checksum": "2dc18c3d208e806a5c0a883bc4c75123",
299+
"size": 16115,
300+
"uri": "https://{canondata_backend}/1946324/fd57527b924653ec8cc0da2bc20971d488d42ab2/resource.tar.gz#test.test_aggregate-aggrs_no_grouping_via_map_compact-default.txt-Debug_/opt.yql"
301+
}
302+
],
303+
"test.test[aggregate-aggrs_no_grouping_via_map_compact-default.txt-Plan]": [
304+
{
305+
"checksum": "ffe00677e97f277dfe654ad3a5233e5c",
306+
"size": 5043,
307+
"uri": "https://{canondata_backend}/1946324/fd57527b924653ec8cc0da2bc20971d488d42ab2/resource.tar.gz#test.test_aggregate-aggrs_no_grouping_via_map_compact-default.txt-Plan_/plan.txt"
308+
}
309+
],
310+
"test.test[aggregate-aggrs_no_grouping_via_map_compact-default.txt-Results]": [
311+
{
312+
"checksum": "d67659cf5731706e3bd23ad4d3eec160",
313+
"size": 18030,
314+
"uri": "https://{canondata_backend}/1946324/fd57527b924653ec8cc0da2bc20971d488d42ab2/resource.tar.gz#test.test_aggregate-aggrs_no_grouping_via_map_compact-default.txt-Results_/results.txt"
315+
}
316+
],
296317
"test.test[aggregate-group_by_expr_order_by_expr--Debug]": [
297318
{
298319
"checksum": "b7e248e6aaaac07b2ad87353d17a6010",

0 commit comments

Comments
 (0)