Skip to content

Commit ecda791

Browse files
Merge 323864b into b82247b
2 parents b82247b + 323864b commit ecda791

File tree

116 files changed

+1878
-1532
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

116 files changed

+1878
-1532
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
609609
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
610610
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
611611
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
612+
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
612613

613614
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
614615
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));

ydb/core/kqp/compile_service/kqp_compile_service.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
536536
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
537537
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();
538538

539+
TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();
540+
539541
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
540542
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
541543

@@ -562,6 +564,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
562564
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
563565
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
564566
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
567+
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
565568
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
566569
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
567570
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
172172
runtimeSettings.UseSpilling = args.WithSpilling;
173173
runtimeSettings.StatsMode = args.StatsMode;
174174

175+
if (runtimeSettings.UseSpilling) {
176+
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
177+
}
178+
175179
if (args.Deadline) {
176180
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
177181
}

ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ using namespace NYql::NDq;
1414

1515
class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
1616
public:
17-
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
18-
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
17+
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
18+
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
1919
, WithSpilling_(withSpilling)
2020
{
2121
}

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() {
7272
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
7373
SetTaskRunner(taskRunner);
7474

75-
auto wakeup = [this]{ ContinueExecute(); };
75+
auto wakeupCallback = [this]{ ContinueExecute(); };
76+
auto errorCallback = [this](const TString& error){ SendError(error); };
7677
try {
77-
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
78+
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
7879
} catch (const NMiniKQL::TKqpEnsureFail& e) {
7980
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
8081
return;

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ void TKqpScanComputeActor::DoBootstrap() {
243243
TBase::SetTaskRunner(taskRunner);
244244

245245
auto wakeup = [this] { ContinueExecute(); };
246-
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
246+
auto errorCallback = [this](const TString& error){ SendError(error); };
247+
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));
247248

248249
ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
249250
ScanData = &ComputeCtx.GetTableScan(0);

ydb/core/kqp/executer_actor/kqp_planner.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
204204
request.SetStartAllOrFail(true);
205205
if (UseDataQueryPool) {
206206
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
207+
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
207208
} else {
208209
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
209210
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
141141
true, // defaultWatermarksMode
142142
true); // syncActor
143143
} else {
144-
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
144+
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
145+
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey(), spillingSettings.IsAggregationSpillingEnabled());
145146
}
146147
if (output) {
147148
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
252252
}
253253

254254
TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) {
255-
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false);
255+
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
256+
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false, spillingSettings.IsAggregationSpillingEnabled());
256257
DumpAppliedRule("ExpandAggregatePhase", node.Ptr(), output, ctx);
257258
return TExprBase(output);
258259
}

ydb/core/kqp/provider/yql_kikimr_exec.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -815,10 +815,11 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
815815

816816
TVector<TExprBase> fakeReads;
817817
auto paramsType = NDq::CollectParameters(programLambda, ctx);
818+
NDq::TSpillingSettings spillingSettings{SessionCtx->Config().GetEnabledSpillingNodes()};
818819
lambda = NDq::BuildProgram(
819820
programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
820821
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
821-
ctx, fakeReads);
822+
ctx, fakeReads, spillingSettings);
822823

823824
NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
824825
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry());

ydb/core/kqp/provider/yql_kikimr_settings.cpp

+28-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include <ydb/core/protos/config.pb.h>
44
#include <ydb/core/protos/table_service_config.pb.h>
55
#include <util/generic/size_literals.h>
6+
#include <util/string/split.h>
7+
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
68

79
namespace NYql {
810

@@ -23,6 +25,21 @@ EOptionalFlag GetOptionalFlagValue(const TMaybe<TType>& flag) {
2325
return EOptionalFlag::Disabled;
2426
}
2527

28+
29+
ui64 ParseEnableSpillingNodes(const TString &v) {
30+
ui64 res = 0;
31+
TVector<TString> vec;
32+
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
33+
for (auto& s: vec) {
34+
if (s.empty()) {
35+
throw yexception() << "Empty value item";
36+
}
37+
auto value = FromString<NDq::EEnabledSpillingNodes>(s);
38+
res |= ui64(value);
39+
}
40+
return res;
41+
}
42+
2643
static inline bool GetFlagValue(const TMaybe<bool>& flag) {
2744
return flag ? flag.GetRef() : false;
2845
}
@@ -73,6 +90,8 @@ TKikimrConfiguration::TKikimrConfiguration() {
7390

7491
REGISTER_SETTING(*this, OptUseFinalizeByKey);
7592
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
93+
REGISTER_SETTING(*this, EnableSpillingNodes)
94+
.Parser([](const TString& v) { return ParseEnableSpillingNodes(v); });
7695

7796
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
7897

@@ -126,10 +145,9 @@ bool TKikimrSettings::HasOptEnableOlapProvideComputeSharding() const {
126145
}
127146

128147
bool TKikimrSettings::HasOptUseFinalizeByKey() const {
129-
return GetOptionalFlagValue(OptUseFinalizeByKey.Get()) != EOptionalFlag::Disabled;
148+
return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled;
130149
}
131150

132-
133151
EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
134152
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
135153
}
@@ -151,4 +169,12 @@ TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
151169
return std::make_shared<const TKikimrSettings>(*this);
152170
}
153171

172+
void TKikimrConfiguration::SetDefaultEnabledSpillingNodes(const TString& node) {
173+
DefaultEnableSpillingNodes = ParseEnableSpillingNodes(node);
174+
}
175+
176+
ui64 TKikimrConfiguration::GetEnabledSpillingNodes() const {
177+
return EnableSpillingNodes.Get().GetOrElse(DefaultEnableSpillingNodes);
178+
}
179+
154180
}

ydb/core/kqp/provider/yql_kikimr_settings.h

+5
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ struct TKikimrSettings {
5959
NCommon::TConfSetting<TString, false> OptJoinAlgoHints;
6060
NCommon::TConfSetting<TString, false> OptJoinOrderHints;
6161

62+
6263
/* Disable optimizer rules */
6364
NCommon::TConfSetting<bool, false> OptDisableTopSort;
6465
NCommon::TConfSetting<bool, false> OptDisableSqlInToJoin;
@@ -175,6 +176,10 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
175176
bool EnableSpillingGenericQuery = false;
176177
ui32 DefaultCostBasedOptimizationLevel = 4;
177178
bool EnableConstantFolding = true;
179+
ui64 DefaultEnableSpillingNodes = 0;
180+
181+
void SetDefaultEnabledSpillingNodes(const TString& node);
182+
ui64 GetEnabledSpillingNodes() const;
178183
};
179184

180185
}

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include <library/cpp/monlib/service/pages/templates.h>
4747
#include <library/cpp/resource/resource.h>
4848

49+
#include <util/folder/dirut.h>
4950

5051
namespace NKikimr::NKqp {
5152

@@ -236,9 +237,15 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
236237
ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext());
237238

238239
if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
240+
TString spillingRoot = cfg.GetRoot();
241+
if (spillingRoot.empty()) {
242+
spillingRoot = NYql::NDq::GetTmpSpillingRootForCurrentUser();
243+
MakeDirIfNotExist(spillingRoot);
244+
}
245+
239246
SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService(
240247
NYql::NDq::TFileSpillingServiceConfig{
241-
.Root = cfg.GetRoot(),
248+
.Root = spillingRoot,
242249
.MaxTotalSize = cfg.GetMaxTotalSize(),
243250
.MaxFileSize = cfg.GetMaxFileSize(),
244251
.MaxFilePartSize = cfg.GetMaxFilePartSize(),

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -777,8 +777,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
777777
stageProto.SetIsEffectsStage(hasEffects || hasTxTableSink);
778778

779779
auto paramsType = CollectParameters(stage, ctx);
780+
NDq::TSpillingSettings spillingSettings{Config->GetEnabledSpillingNodes()};
780781
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
781-
ctx, {});
782+
ctx, {}, spillingSettings);
782783

783784
auto& programProto = *stageProto.MutableProgram();
784785
programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);

ydb/core/kqp/ut/olap/aggregations_ut.cpp

+43
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,49 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
171171
}
172172
}
173173

174+
Y_UNIT_TEST_TWIN(DisableBlockEngineInAggregationWithSpilling, AllowSpilling) {
175+
auto settings = TKikimrSettings()
176+
.SetWithSampleTables(false);
177+
settings.AppConfig.MutableTableServiceConfig()->SetBlockChannelsMode(NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_FORCE);
178+
if (AllowSpilling) {
179+
settings.AppConfig.MutableTableServiceConfig()->SetEnableSpillingNodes("Aggregation");
180+
} else {
181+
settings.AppConfig.MutableTableServiceConfig()->SetEnableSpillingNodes("None");
182+
}
183+
TKikimrRunner kikimr(settings);
184+
185+
TLocalHelper(kikimr).CreateTestOlapTable();
186+
auto client = kikimr.GetQueryClient();
187+
188+
{
189+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
190+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
191+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000);
192+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000);
193+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000);
194+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
195+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
196+
}
197+
198+
{
199+
TString query = R"(
200+
--!syntax_v1
201+
SELECT
202+
COUNT(*)
203+
FROM `/Root/olapStore/olapTable`
204+
GROUP BY level
205+
)";
206+
207+
auto res = StreamExplainQuery(query, client);
208+
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
209+
210+
auto plan = CollectStreamResult(res);
211+
212+
bool hasWideCombiner = plan.QueryStats->Getquery_ast().Contains("WideCombiner");
213+
UNIT_ASSERT_C(hasWideCombiner == AllowSpilling, plan.QueryStats->Getquery_ast());
214+
}
215+
}
216+
174217
Y_UNIT_TEST_TWIN(CountAllPushdown, UseLlvm) {
175218
auto settings = TKikimrSettings()
176219
.SetWithSampleTables(false);

ydb/core/kqp/ut/olap/clickbench_ut.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,10 @@ Y_UNIT_TEST_SUITE(KqpOlapClickbench) {
166166
GROUP BY RegionID
167167
ORDER BY c DESC
168168
LIMIT 10
169-
)");
169+
)")
170170
//.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]")
171-
// .SetExpectedReadNodeType("TableFullScan");
172-
// .SetExpectedReadNodeType("Aggregate-TableFullScan");
171+
.SetExpectedReadNodeType("TableFullScan");
172+
173173
q9.FillExpectedAggregationGroupByPlanOptions();
174174

175175
TAggregationTestCase q12;

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -2491,6 +2491,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
24912491
NKikimrConfig::TAppConfig appConfig;
24922492
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
24932493
appConfig.MutableTableServiceConfig()->SetBlockChannelsMode(blockChannelsMode);
2494+
appConfig.MutableTableServiceConfig()->SetEnableSpillingNodes("None");
24942495
auto settings = TKikimrSettings()
24952496
.SetAppConfig(appConfig)
24962497
.SetWithSampleTables(true);

0 commit comments

Comments
 (0)