Skip to content

Commit c984a1c

Browse files
committed
YQL-18343: Propagate EnableSpilling flag to compute nodes.
1 parent e597d2e commit c984a1c

File tree

7 files changed

+18
-10
lines changed

7 files changed

+18
-10
lines changed

ydb/core/kqp/executer_actor/kqp_literal_executer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ class TKqpLiteralExecuter {
173173
NDqProto::TDqTask protoTask;
174174
protoTask.SetId(task.Id);
175175
protoTask.SetStageId(task.StageId.StageId);
176+
protoTask.SetEnableSpilling(false); // TODO: enable spilling
176177
protoTask.MutableProgram()->CopyFrom(stage.GetProgram()); // it's not good...
177178

178179
TaskId2StageId[task.Id] = task.StageId.StageId;

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,6 +1141,7 @@ void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, N
11411141
result->SetId(task.Id);
11421142
result->SetStageId(stageInfo.Id.StageId);
11431143
result->SetUseLlvm(task.GetUseLlvm());
1144+
result->SetEnableSpilling(false); // TODO: enable spilling
11441145
if (task.HasMetaId()) {
11451146
result->SetMetaId(task.GetMetaIdUnsafe());
11461147
}

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class TLocalTaskRunnerActor
4949
, TaskId(taskId)
5050
, InputChannelsWithDisabledCheckpoints(std::move(inputChannelsWithDisabledCheckpoints))
5151
, MemoryQuota(std::move(memoryQuota))
52-
{
52+
{
5353
}
5454

5555
~TLocalTaskRunnerActor()
@@ -427,16 +427,18 @@ class TLocalTaskRunnerActor
427427
}
428428

429429
TaskRunner->Prepare(settings, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx);
430-
430+
431431
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>> inputTransforms;
432432
for (auto i = 0; i != inputs.size(); ++i) {
433433
if (auto t = TaskRunner->GetInputTransform(i)) {
434434
inputTransforms[i] = *t;
435435
}
436436
}
437437

438-
auto wakeUpCallback = ev->Get()->ExecCtx->GetWakeupCallback();
439-
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(TxId, NActors::TActivationContext::ActorSystem(), wakeUpCallback));
438+
if (settings.GetEnableSpilling()) {
439+
auto wakeUpCallback = ev->Get()->ExecCtx->GetWakeupCallback();
440+
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(TxId, NActors::TActivationContext::ActorSystem(), wakeUpCallback));
441+
}
440442

441443
auto event = MakeHolder<TEvTaskRunnerCreateFinished>(
442444
TaskRunner->GetSecureParams(),

ydb/library/yql/dq/proto/dq_tasks.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,4 +202,5 @@ message TDqTask {
202202
optional bool UseLlvm = 16;
203203
repeated bytes ReadRanges = 17;
204204
map<string, string> RequestContext = 18;
205+
optional bool EnableSpilling = 19;
205206
}

ydb/library/yql/dq/runtime/dq_tasks_runner.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,10 @@ class TDqTaskSettings {
347347
return Task_->GetRequestContext();
348348
}
349349

350+
bool GetEnableSpilling() const {
351+
return Task_->HasEnableSpilling() && Task_->GetEnableSpilling();
352+
}
353+
350354
private:
351355

352356
// external callback to retrieve parameter value.
@@ -405,8 +409,8 @@ class IDqTaskRunner : public TSimpleRefCount<IDqTaskRunner>, private TNonCopyabl
405409
};
406410

407411
TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(
408-
NKikimr::NMiniKQL::TScopedAlloc& alloc,
409-
const TDqTaskRunnerContext& ctx,
412+
NKikimr::NMiniKQL::TScopedAlloc& alloc,
413+
const TDqTaskRunnerContext& ctx,
410414
const TDqTaskRunnerSettings& settings,
411415
const TLogFunc& logFunc
412416
);

ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ using TBase = TComputationValue<TSpillingSupportState<Sort, HasCount>>;
443443
switch (InputStatus = Flow->FetchValues(ctx, GetFields())) {
444444
case EFetchResult::One:
445445
if (Put()) {
446-
if (!HasMemoryForProcessing()) {
446+
if (ctx.SpillerFactory && !HasMemoryForProcessing()) {
447447
SwitchMode(EOperatingMode::Spilling, ctx);
448448
return EFetchResult::Yield;
449449
}
@@ -562,9 +562,7 @@ using TBase = TComputationValue<TSpillingSupportState<Sort, HasCount>>;
562562
EOperatingMode GetMode() const { return Mode; }
563563

564564
bool HasMemoryForProcessing() const {
565-
// TODO: Change to enable spilling
566-
// return !TlsAllocState->IsMemoryYellowZoneEnabled();
567-
return true;
565+
return !TlsAllocState->IsMemoryYellowZoneEnabled();
568566
}
569567

570568
bool IsReadFromChannelFinished() const {

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ namespace NYql::NDqs {
449449
taskMeta.SetStageId(publicId);
450450
taskDesc.MutableMeta()->PackFrom(taskMeta);
451451
taskDesc.SetStageId(stageId);
452+
taskDesc.SetEnableSpilling(Settings->IsSpillingEnabled());
452453

453454
if (Settings->DisableLLVMForBlockStages.Get().GetOrElse(true)) {
454455
auto& stage = TasksGraph.GetStageInfo(task.StageId).Meta.Stage;

0 commit comments

Comments
 (0)