Skip to content

Commit c7c8bb3

Browse files
committed
Disabled block engine in aggregation when spilling is enabled
1 parent 5ccb818 commit c7c8bb3

File tree

12 files changed

+54
-40
lines changed

12 files changed

+54
-40
lines changed

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
118118
true, // defaultWatermarksMode
119119
true); // syncActor
120120
} else {
121-
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
121+
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
122+
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey(), spillingSettings.IsAggregationSpillingEnabled());
122123
}
123124
if (output) {
124125
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
252252
}
253253

254254
TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) {
255-
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false);
255+
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
256+
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false, spillingSettings.IsAggregationSpillingEnabled());
256257
DumpAppliedRule("ExpandAggregatePhase", node.Ptr(), output, ctx);
257258
return TExprBase(output);
258259
}

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ ui64 ParseEnableSpillingNodes(const TString &v) {
3434
if (s.empty()) {
3535
throw yexception() << "Empty value item";
3636
}
37-
auto value = FromString<NYql::TDqSettings::EEnabledSpillingNodes>(s);
37+
auto value = FromString<NDq::EEnabledSpillingNodes>(s);
3838
res |= ui64(value);
3939
}
4040
return res;

ydb/library/yql/core/yql_aggregate_expander.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregateWithFullOutput()
3636

3737
HaveDistinct = AnyOf(AggregatedColumns->ChildrenList(),
3838
[](const auto& child) { return child->ChildrenSize() == 3; });
39-
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.IsBlockEngineEnabled()) || ForceCompact || HasSetting(*settings, "compact");
39+
EffectiveCompact = (HaveDistinct && CompactForDistinct && !UseBlocks) || ForceCompact || HasSetting(*settings, "compact");
4040
for (const auto& trait : Traits) {
4141
auto mergeLambda = trait->Child(5);
4242
if (mergeLambda->Tail().IsCallable("Void")) {
@@ -67,7 +67,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregateWithFullOutput()
6767
return GeneratePhases();
6868
}
6969

70-
if (TypesCtx.IsBlockEngineEnabled()) {
70+
if (UseBlocks) {
7171
if (Suffix == "Combine") {
7272
auto ret = TryGenerateBlockCombine();
7373
if (ret) {
@@ -2785,7 +2785,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
27852785
streams.push_back(SerializeIdxSet(indicies));
27862786
}
27872787

2788-
if (TypesCtx.IsBlockEngineEnabled()) {
2788+
if (UseBlocks) {
27892789
for (ui32 i = 0; i < unionAllInputs.size(); ++i) {
27902790
unionAllInputs[i] = Ctx.Builder(Node->Pos())
27912791
.Callable("Map")
@@ -2806,7 +2806,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
28062806
}
28072807

28082808
auto settings = cleanOutputSettings;
2809-
if (TypesCtx.IsBlockEngineEnabled()) {
2809+
if (UseBlocks) {
28102810
settings = AddSetting(*settings, Node->Pos(), "many_streams", Ctx.NewList(Node->Pos(), std::move(streams)), Ctx);
28112811
}
28122812

@@ -2839,7 +2839,7 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombine() {
28392839
}
28402840

28412841
TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalize() {
2842-
if (UsePartitionsByKeys || !TypesCtx.IsBlockEngineEnabled()) {
2842+
if (UsePartitionsByKeys || !UseBlocks) {
28432843
return nullptr;
28442844
}
28452845

@@ -2934,7 +2934,7 @@ TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContex
29342934
return ret;
29352935
}
29362936
}
2937-
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled());
2937+
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled(), false);
29382938
}
29392939

29402940
} // namespace NYql

ydb/library/yql/core/yql_aggregate_expander.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace NYql {
99
class TAggregateExpander {
1010
public:
1111
TAggregateExpander(bool usePartitionsByKeys, const bool useFinalizeByKeys, const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
12-
bool forceCompact = false, bool compactForDistinct = false, bool usePhases = false)
12+
bool forceCompact = false, bool compactForDistinct = false, bool usePhases = false, bool allowSpilling = false)
1313
: Node(node)
1414
, Ctx(ctx)
1515
, TypesCtx(typesCtx)
@@ -25,6 +25,7 @@ class TAggregateExpander {
2525
, HaveSessionSetting(false)
2626
, OriginalRowType(nullptr)
2727
, RowType(nullptr)
28+
, UseBlocks(typesCtx.IsBlockEngineEnabled() && !allowSpilling)
2829
{
2930
PreMap = Ctx.Builder(node->Pos())
3031
.Lambda()
@@ -115,6 +116,7 @@ class TAggregateExpander {
115116
const TStructExprType* RowType;
116117
TVector<const TItemExprType*> RowItems;
117118
TExprNode::TPtr PreMap;
119+
bool UseBlocks;
118120

119121
TExprNode::TListType InitialColumnNames;
120122
TExprNode::TListType FinalColumnNames;
@@ -130,8 +132,10 @@ class TAggregateExpander {
130132
std::unordered_map<std::string_view, TExprNode::TPtr> UdfWasChanged;
131133
};
132134

133-
inline TExprNode::TPtr ExpandAggregatePeepholeImpl(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, const bool useFinalizeByKey, const bool useBlocks) {
134-
TAggregateExpander aggExpander(!useFinalizeByKey && !useBlocks, useFinalizeByKey, node, ctx, typesCtx, true);
135+
inline TExprNode::TPtr ExpandAggregatePeepholeImpl(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
136+
const bool useFinalizeByKey, const bool useBlocks, const bool allowSpilling) {
137+
TAggregateExpander aggExpander(!useFinalizeByKey && !useBlocks, useFinalizeByKey, node, ctx, typesCtx,
138+
true, false, false, allowSpilling);
135139
return aggExpander.ExpandAggregate();
136140
}
137141

ydb/library/yql/dq/common/dq_common.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,34 @@ enum class EHashJoinMode {
9292
GraceAndSelf /* "graceandself" */,
9393
};
9494

95+
enum class EEnabledSpillingNodes : ui64 {
96+
None = 0ULL /* None */,
97+
GraceJoin = 1ULL /* "GraceJoin" */,
98+
Aggregation = 2ULL /* "Aggregation" */,
99+
All = ~0ULL /* "All" */,
100+
};
101+
102+
class TSpillingSettings {
103+
public:
104+
TSpillingSettings() = default;
105+
explicit TSpillingSettings(ui64 mask) : Mask(mask) {};
106+
107+
operator bool() const {
108+
return Mask;
109+
}
110+
111+
bool IsGraceJoinSpillingEnabled() const {
112+
return Mask & ui64(EEnabledSpillingNodes::GraceJoin);
113+
}
114+
115+
bool IsAggregationSpillingEnabled() const {
116+
return Mask & ui64(EEnabledSpillingNodes::Aggregation);
117+
}
118+
119+
private:
120+
const ui64 Mask = 0;
121+
};
122+
95123
} // namespace NYql::NDq
96124

97125
IOutputStream& operator<<(IOutputStream& stream, const NYql::NDq::TTxId& txId);

ydb/library/yql/dq/opt/dq_opt_log.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ using namespace NYql::NNodes;
1717
namespace NYql::NDq {
1818

1919
TExprBase DqRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool compactForDistinct,
20-
bool usePhases, const bool useFinalizeByKey)
20+
bool usePhases, const bool useFinalizeByKey, const bool allowSpilling)
2121
{
2222
if (!node.Maybe<TCoAggregateBase>()) {
2323
return node;
2424
}
25-
TAggregateExpander aggExpander(!typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
25+
TAggregateExpander aggExpander(!typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey,
26+
useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases, allowSpilling);
2627
auto result = aggExpander.ExpandAggregate();
2728
YQL_ENSURE(result);
2829

ydb/library/yql/dq/opt/dq_opt_log.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ namespace NYql {
1919

2020
namespace NYql::NDq {
2121

22-
NNodes::TExprBase DqRewriteAggregate(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool compactForDistinct, bool usePhases, const bool useFinalizeByKey);
22+
NNodes::TExprBase DqRewriteAggregate(NNodes::TExprBase node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
23+
bool compactForDistinct, bool usePhases, const bool useFinalizeByKey, const bool allowSpilling);
2324

2425
NNodes::TExprBase DqRewriteTakeSortToTopSort(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents);
2526

ydb/library/yql/dq/tasks/dq_task_program.h

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,6 @@
1111

1212
namespace NYql::NDq {
1313

14-
class TSpillingSettings {
15-
public:
16-
TSpillingSettings() = default;
17-
explicit TSpillingSettings(ui64 mask) : Mask(mask) {};
18-
19-
operator bool() const {
20-
return Mask;
21-
}
22-
23-
bool IsGraceJoinSpillingEnabled() const {
24-
return Mask & ui64(TDqConfiguration::EEnabledSpillingNodes::GraceJoin);
25-
}
26-
27-
private:
28-
const ui64 Mask = 0;
29-
};
30-
3114
const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext& ctx);
3215

3316
TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsType,

ydb/library/yql/providers/dq/common/yql_dq_settings.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ TDqConfiguration::TDqConfiguration() {
110110
if (s.empty()) {
111111
throw yexception() << "Empty value item";
112112
}
113-
auto value = FromString<EEnabledSpillingNodes>(s);
113+
auto value = FromString<NDq::EEnabledSpillingNodes>(s);
114114
res |= ui64(value);
115115
}
116116
return res;

ydb/library/yql/providers/dq/common/yql_dq_settings.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,6 @@ struct TDqSettings {
2828
File /* "file" */,
2929
};
3030

31-
enum class EEnabledSpillingNodes : ui64 {
32-
None = 0ULL /* None */,
33-
GraceJoin = 1ULL /* "GraceJoin" */,
34-
All = ~0ULL /* "All" */,
35-
};
36-
3731
struct TDefault {
3832
static constexpr ui32 MaxTasksPerStage = 20U;
3933
static constexpr ui32 MaxTasksPerOperation = 70U;

ydb/library/yql/providers/dq/opt/logical_optimize.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
137137
bool syncActor = Config->ComputeActorType.Get() != "async";
138138
return NHopping::RewriteAsHoppingWindow(node, ctx, input.Cast(), analyticsHopping, lateArrivalDelay, defaultWatermarksMode, syncActor);
139139
} else {
140-
return DqRewriteAggregate(node, ctx, TypesCtx, true, Config->UseAggPhases.Get().GetOrElse(false), Config->UseFinalizeByKey.Get().GetOrElse(false));
140+
NDq::TSpillingSettings spillingSettings(Config->GetEnabledSpillingNodes());
141+
return DqRewriteAggregate(node, ctx, TypesCtx, true, Config->UseAggPhases.Get().GetOrElse(false), Config->UseFinalizeByKey.Get().GetOrElse(false), spillingSettings.IsAggregationSpillingEnabled());
141142
}
142143
}
143144
return node;

0 commit comments

Comments
 (0)