Skip to content

Commit 645cbdf

Browse files
committed
add tests to pragma and feature flag
1 parent a786180 commit 645cbdf

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
166166
runtimeSettings.UseSpilling = args.WithSpilling;
167167
runtimeSettings.StatsMode = args.StatsMode;
168168

169+
if (runtimeSettings.UseSpilling) {
170+
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
171+
}
172+
169173
if (args.Deadline) {
170174
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
171175
}

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
194194
request.SetStartAllOrFail(true);
195195
if (UseDataQueryPool) {
196196
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
197+
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
197198
} else {
198199
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
199200
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);

ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,70 @@ NKikimrConfig::TAppConfig AppCfg() {
3232
return appCfg;
3333
}
3434

35+
NKikimrConfig::TAppConfig AppCfgLowComputeLimits(ui64 reasonableTreshold) {
36+
NKikimrConfig::TAppConfig appCfg;
37+
38+
auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
39+
rm->SetMkqlLightProgramMemoryLimit(100);
40+
rm->SetMkqlHeavyProgramMemoryLimit(300);
41+
rm->SetReasonableSpillingTreshold(reasonableTreshold);
42+
appCfg.MutableTableServiceConfig()->SetEnableQueryServiceSpilling(true);
43+
44+
auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();
45+
46+
spilling->SetEnable(true);
47+
spilling->SetRoot("./spilling/");
48+
49+
return appCfg;
50+
}
51+
52+
3553
} // anonymous namespace
3654

3755
Y_UNIT_TEST_SUITE(KqpScanSpilling) {
3856

57+
Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
58+
ui64 reasonableTreshold = EnabledSpilling ? 100 : 200_MB;
59+
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
60+
TKikimrRunner kikimr(AppCfgLowComputeLimits(reasonableTreshold));
61+
62+
auto db = kikimr.GetQueryClient();
63+
64+
for (ui32 i = 0; i < 300; ++i) {
65+
auto result = db.ExecuteQuery(Sprintf(R"(
66+
--!syntax_v1
67+
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
68+
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
69+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
70+
}
71+
72+
auto query = R"(
73+
--!syntax_v1
74+
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
75+
select t1.Key, t1.Value, t2.Key, t2.Value
76+
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
77+
order by t1.Value
78+
)";
79+
80+
auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
81+
auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
82+
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
83+
84+
Cerr << planres.GetStats()->GetAst() << Endl;
85+
86+
auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
87+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
88+
89+
TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
90+
if (EnabledSpilling) {
91+
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() > 0);
92+
UNIT_ASSERT(counters.SpillingReadBlobs->Val() > 0);
93+
} else {
94+
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() == 0);
95+
UNIT_ASSERT(counters.SpillingReadBlobs->Val() == 0);
96+
}
97+
}
98+
3999
Y_UNIT_TEST(SelfJoinQueryService) {
40100
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
41101

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
5555
}
5656
}
5757

58-
bool DoHandleChannelsAfterFinishImpl() override final{
58+
bool DoHandleChannelsAfterFinishImpl() override final{
5959
Y_ABORT_UNLESS(this->Checkpoints);
6060

6161
if (this->Checkpoints->HasPendingCheckpoint() && !this->Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {

0 commit comments

Comments
 (0)