Skip to content

Commit 96317f8

Browse files
committed
YQL-17542 move TaskRunner dependent Execute to TDqSyncComputeActorBase
1 parent 4fef0c2 commit 96317f8

File tree

2 files changed

+37
-23
lines changed

2 files changed

+37
-23
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -304,30 +304,14 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
304304
ReportStats(TInstant::Now(), ESendStats::IfPossible);
305305
}
306306
if (Terminated) {
307-
TaskRunner.Reset();
307+
DoTerminateImpl();
308308
MemoryQuota.Reset();
309309
MemoryLimits.MemoryQuotaManager.reset();
310310
}
311311
}
312312

313-
virtual void DoExecuteImpl() {
314-
auto sourcesState = GetSourcesState();
315-
316-
PollAsyncInput();
317-
ERunStatus status = TaskRunner->Run();
318-
319-
CA_LOG_T("Resume execution, run status: " << status);
320-
321-
if (status != ERunStatus::Finished) {
322-
PollSources(std::move(sourcesState));
323-
}
324-
325-
if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
326-
Checkpoints->DoCheckpoint();
327-
}
328-
329-
ProcessOutputsImpl(status);
330-
}
313+
virtual void DoExecuteImpl() = 0;
314+
virtual void DoTerminateImpl() {}
331315

332316
virtual bool DoHandleChannelsAfterFinishImpl() {
333317
Y_ABORT_UNLESS(Checkpoints);
@@ -606,12 +590,11 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
606590
InternalError(statusCode, TIssues({std::move(issue)}));
607591
}
608592

593+
virtual void InvalidateMeminfo() {}
594+
609595
void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) {
610596
CA_LOG_E(InternalErrorLogString(statusCode, issues));
611-
if (TaskRunner) {
612-
TaskRunner->GetAllocator().InvalidateMemInfo();
613-
TaskRunner->GetAllocator().DisableStrictAllocationCheck();
614-
}
597+
InvalidateMeminfo();
615598
State = NDqProto::COMPUTE_STATE_FAILURE;
616599
ReportStateAndMaybeDie(statusCode, issues);
617600
}

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,37 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
4444
return inputTransformInfo.Buffer.Get();
4545
}
4646
protected:
47+
48+
void DoExecuteImpl() override{
49+
auto sourcesState = TBase::GetSourcesState();
50+
51+
TBase::PollAsyncInput();
52+
ERunStatus status = this->TaskRunner->Run();
53+
54+
CA_LOG_T("Resume execution, run status: " << status);
55+
56+
if (status != ERunStatus::Finished) {
57+
TBase::PollSources(std::move(sourcesState));
58+
}
59+
60+
if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && this->Checkpoints && this->Checkpoints->HasPendingCheckpoint() && !this->Checkpoints->ComputeActorStateSaved() && TBase::ReadyToCheckpoint()) {
61+
this->Checkpoints->DoCheckpoint();
62+
}
63+
64+
TBase::ProcessOutputsImpl(status);
65+
}
66+
67+
void DoTerminateImpl() override {
68+
this->TaskRunner.Reset();
69+
}
70+
71+
void InvalidateMeminfo() override {
72+
if (this->TaskRunner) {
73+
this->TaskRunner->GetAllocator().InvalidateMemInfo();
74+
this->TaskRunner->GetAllocator().DisableStrictAllocationCheck();
75+
}
76+
}
77+
4778
void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
4879
CA_LOG_D("Save state");
4980
NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();

0 commit comments

Comments
 (0)