Skip to content

Commit 8b7eb04

Browse files
committed
initial
1 parent adc4da7 commit 8b7eb04

File tree

18 files changed

+89
-31
lines changed

18 files changed

+89
-31
lines changed

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1558,6 +1558,7 @@ class TKqpHost : public IKqpHost {
15581558
|| settingName == "DisableOrderedColumns"
15591559
|| settingName == "Warning"
15601560
|| settingName == "UseBlocks"
1561+
|| settingName == "BlockEngine"
15611562
;
15621563
};
15631564
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);

ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA
5858
} else if (auto maybeRead = node.Maybe<TKqpReadOlapTableRanges>()) {
5959
auto read = maybeRead.Cast();
6060

61-
if (typesCtx.UseBlocks) {
61+
if (typesCtx.IsBlockEngineEnabled()) {
6262
wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos())
6363
.Input<TKqpBlockReadOlapTableRanges>()
6464
.Table(read.Table())

ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7774,7 +7774,7 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo
77747774
pipeline.Add(
77757775
CreateFunctorTransformer(
77767776
[&types](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
7777-
if (types.UseBlocks) {
7777+
if (types.IsBlockEngineEnabled()) {
77787778
const auto& extStageRules = TPeepHoleRules::Instance().BlockStageExtRules;
77797779
return PeepHoleBlockStage(input, output, ctx, types, extStageRules);
77807780
} else {
@@ -7789,7 +7789,7 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo
77897789
pipeline.Add(
77907790
CreateFunctorTransformer(
77917791
[&types](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
7792-
if (types.UseBlocks) {
7792+
if (types.IsBlockEngineEnabled()) {
77937793
const auto& extStageRules = TPeepHoleRules::Instance().BlockStageExtFinalRules;
77947794
return PeepHoleBlockStage(input, output, ctx, types, extStageRules);
77957795
} else {

ydb/library/yql/core/type_ann/type_ann_list.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4689,7 +4689,7 @@ namespace {
46894689
return IGraphTransformer::TStatus::Repeat;
46904690
}
46914691

4692-
if (isMany && ctx.Types.UseBlocks) {
4692+
if (isMany && ctx.Types.IsBlockEngineEnabled()) {
46934693
auto streamIndex = inputStructType->FindItem("_yql_group_stream_index");
46944694
if (streamIndex) {
46954695
const TTypeAnnotationNode* streamIndexType = inputStructType->GetItems()[*streamIndex]->GetItemType();

ydb/library/yql/core/yql_aggregate_expander.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregate()
2525

2626
HaveDistinct = AnyOf(AggregatedColumns->ChildrenList(),
2727
[](const auto& child) { return child->ChildrenSize() == 3; });
28-
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.UseBlocks) || ForceCompact || HasSetting(*settings, "compact");
28+
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.IsBlockEngineEnabled()) || ForceCompact || HasSetting(*settings, "compact");
2929
for (const auto& trait : Traits) {
3030
auto mergeLambda = trait->Child(5);
3131
if (mergeLambda->Tail().IsCallable("Void")) {
@@ -56,7 +56,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregate()
5656
return GeneratePhases();
5757
}
5858

59-
if (TypesCtx.UseBlocks) {
59+
if (TypesCtx.IsBlockEngineEnabled()) {
6060
if (Suffix == "Combine") {
6161
auto ret = TryGenerateBlockCombine();
6262
if (ret) {
@@ -2776,7 +2776,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
27762776
streams.push_back(SerializeIdxSet(indicies));
27772777
}
27782778

2779-
if (TypesCtx.UseBlocks) {
2779+
if (TypesCtx.IsBlockEngineEnabled()) {
27802780
for (ui32 i = 0; i < unionAllInputs.size(); ++i) {
27812781
unionAllInputs[i] = Ctx.Builder(Node->Pos())
27822782
.Callable("Map")
@@ -2797,7 +2797,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
27972797
}
27982798

27992799
auto settings = Node->ChildPtr(3);
2800-
if (TypesCtx.UseBlocks) {
2800+
if (TypesCtx.IsBlockEngineEnabled()) {
28012801
settings = AddSetting(*settings, Node->Pos(), "many_streams", Ctx.NewList(Node->Pos(), std::move(streams)), Ctx);
28022802
}
28032803

@@ -2830,7 +2830,7 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombine() {
28302830
}
28312831

28322832
TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalize() {
2833-
if (UsePartitionsByKeys || !TypesCtx.UseBlocks) {
2833+
if (UsePartitionsByKeys || !TypesCtx.IsBlockEngineEnabled()) {
28342834
return nullptr;
28352835
}
28362836

@@ -2919,13 +2919,13 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
29192919
TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
29202920
if (NNodes::TCoAggregate::Match(node.Get())) {
29212921
NNodes::TCoAggregate self(node);
2922-
auto ret = TAggregateExpander::CountAggregateRewrite(self, ctx, typesCtx.UseBlocks);
2922+
auto ret = TAggregateExpander::CountAggregateRewrite(self, ctx, typesCtx.IsBlockEngineEnabled());
29232923
if (ret != node) {
29242924
YQL_CLOG(DEBUG, Core) << "CountAggregateRewrite on peephole";
29252925
return ret;
29262926
}
29272927
}
2928-
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.UseBlocks);
2928+
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled());
29292929
}
29302930

29312931
} // namespace NYql

ydb/library/yql/core/yql_type_annotation.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,12 @@ enum class EMatchRecognizeStreamingMode {
193193
Force,
194194
};
195195

196+
enum class EBlockEngineMode {
197+
Disable /* "disable" */,
198+
Auto /* "auto" */,
199+
Force /* "force" */,
200+
};
201+
196202
struct TUdfCachedInfo {
197203
const TTypeAnnotationNode* FunctionType = nullptr;
198204
const TTypeAnnotationNode* RunConfigType = nullptr;
@@ -251,6 +257,7 @@ struct TTypeAnnotationContext: public TThrRefBase {
251257
bool YsonCastToString = true;
252258
ui32 FolderSubDirsLimit = 1000;
253259
bool UseBlocks = false;
260+
EBlockEngineMode BlockEngineMode = EBlockEngineMode::Disable;
254261
bool PgEmitAggApply = false;
255262
IArrowResolver::TPtr ArrowResolver;
256263
ECostBasedOptimizerType CostBasedOptimizer = ECostBasedOptimizerType::Disable;
@@ -350,7 +357,10 @@ struct TTypeAnnotationContext: public TThrRefBase {
350357
void SetStats(const TExprNode* input, std::shared_ptr<TOptimizerStatistics> stats) {
351358
StatisticsMap[input] = stats;
352359
}
353-
360+
361+
bool IsBlockEngineEnabled() const {
362+
return BlockEngineMode != EBlockEngineMode::Disable || UseBlocks;
363+
}
354364
};
355365

356366
template <> inline

ydb/library/yql/dq/opt/dq_opt_log.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ TExprBase DqRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationC
2121
if (!node.Maybe<TCoAggregateBase>()) {
2222
return node;
2323
}
24-
TAggregateExpander aggExpander(true, !typesCtx.UseBlocks && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
24+
TAggregateExpander aggExpander(true, !typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
2525
auto result = aggExpander.ExpandAggregate();
2626
YQL_ENSURE(result);
2727

ydb/library/yql/dq/opt/dq_opt_peephole.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -681,7 +681,7 @@ NNodes::TExprBase DqPeepholeRewriteLength(const NNodes::TExprBase& node, TExprCo
681681
}
682682

683683
auto dqPhyLength = node.Cast<TDqPhyLength>();
684-
if (typesCtx.UseBlocks) {
684+
if (typesCtx.IsBlockEngineEnabled()) {
685685
return NNodes::TExprBase(ctx.Builder(node.Pos())
686686
.Callable("NarrowMap")
687687
.Callable(0, "BlockCombineAll")

ydb/library/yql/providers/config/yql_config_provider.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,18 @@ namespace {
888888
return false;
889889
}
890890
}
891+
else if (name == "BlockEngine") {
892+
if (args.size() != 1) {
893+
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected at most 1 argument, but got " << args.size()));
894+
return false;
895+
}
896+
897+
auto arg = TString{args[0]};
898+
if (!TryFromString(arg, Types.BlockEngineMode)) {
899+
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected `disable|auto|force', but got: " << args[0]));
900+
return false;
901+
}
902+
}
891903
else {
892904
ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
893905
return false;

ydb/library/yql/providers/dq/opt/logical_optimize.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
324324
auto input = aggregate.Input().Maybe<TDqConnection>();
325325

326326
if (input) {
327-
auto newNode = TAggregateExpander::CountAggregateRewrite(aggregate, ctx, TypesCtx.UseBlocks);
327+
auto newNode = TAggregateExpander::CountAggregateRewrite(aggregate, ctx, TypesCtx.IsBlockEngineEnabled());
328328
if (node.Ptr() != newNode) {
329329
return TExprBase(newNode);
330330
}

0 commit comments

Comments
 (0)