Skip to content

Commit 900a489

Browse files
authored
Merge 5520542 into d010ed3
2 parents d010ed3 + 5520542 commit 900a489

File tree

3 files changed

+46
-35
lines changed

3 files changed

+46
-35
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
505505

506506
MkqlMemoryLimit = ev->Get()->MkqlMemoryLimit;
507507
ProfileStats = std::move(ev->Get()->ProfileStats);
508-
auto sourcesState = GetSourcesState();
509508
auto status = ev->Get()->RunStatus;
510509

511510
CA_LOG_T("Resume execution, run status: " << status << " checkpoint: " << (bool) ev->Get()->ProgramState
@@ -524,10 +523,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
524523
}
525524
}
526525

527-
if (status != ERunStatus::Finished) {
528-
PollSources(std::move(sourcesState));
529-
}
530-
531526
if (ev->Get()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) {
532527
ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark());
533528
WatermarksTracker.PopPendingWatermark();

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -304,30 +304,14 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
304304
ReportStats(TInstant::Now(), ESendStats::IfPossible);
305305
}
306306
if (Terminated) {
307-
TaskRunner.Reset();
307+
DoTerminateImpl();
308308
MemoryQuota.Reset();
309309
MemoryLimits.MemoryQuotaManager.reset();
310310
}
311311
}
312312

313-
virtual void DoExecuteImpl() {
314-
auto sourcesState = static_cast<TDerived*>(this)->GetSourcesState();
315-
316-
PollAsyncInput();
317-
ERunStatus status = TaskRunner->Run();
318-
319-
CA_LOG_T("Resume execution, run status: " << status);
320-
321-
if (status != ERunStatus::Finished) {
322-
static_cast<TDerived*>(this)->PollSources(std::move(sourcesState));
323-
}
324-
325-
if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
326-
Checkpoints->DoCheckpoint();
327-
}
328-
329-
ProcessOutputsImpl(status);
330-
}
313+
virtual void DoExecuteImpl() = 0;
314+
virtual void DoTerminateImpl() {}
331315

332316
virtual bool DoHandleChannelsAfterFinishImpl() {
333317
Y_ABORT_UNLESS(Checkpoints);
@@ -606,12 +590,11 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
606590
InternalError(statusCode, TIssues({std::move(issue)}));
607591
}
608592

593+
virtual void InvalidateMeminfo() {}
594+
609595
void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) {
610596
CA_LOG_E(InternalErrorLogString(statusCode, issues));
611-
if (TaskRunner) {
612-
TaskRunner->GetAllocator().InvalidateMemInfo();
613-
TaskRunner->GetAllocator().DisableStrictAllocationCheck();
614-
}
597+
InvalidateMeminfo();
615598
State = NDqProto::COMPUTE_STATE_FAILURE;
616599
ReportStateAndMaybeDie(statusCode, issues);
617600
}
@@ -1064,13 +1047,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
10641047
return true;
10651048
}
10661049

1067-
protected:
1068-
// methods that are called via static_cast<TDerived*>(this) and may be overriden by a dervied class
1069-
void* GetSourcesState() const {
1070-
return nullptr;
1071-
}
1072-
void PollSources(void* /* state */) {
1073-
}
10741050

10751051

10761052
protected:

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,37 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
4444
return inputTransformInfo.Buffer.Get();
4545
}
4646
protected:
47+
48+
void DoExecuteImpl() override{
49+
auto sourcesState = static_cast<TDerived*>(this)->GetSourcesState();
50+
51+
TBase::PollAsyncInput();
52+
ERunStatus status = this->TaskRunner->Run();
53+
54+
CA_LOG_T("Resume execution, run status: " << status);
55+
56+
if (status != ERunStatus::Finished) {
57+
static_cast<TDerived*>(this)->PollSources(std::move(sourcesState));
58+
}
59+
60+
if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && this->Checkpoints && this->Checkpoints->HasPendingCheckpoint() && !this->Checkpoints->ComputeActorStateSaved() && TBase::ReadyToCheckpoint()) {
61+
this->Checkpoints->DoCheckpoint();
62+
}
63+
64+
TBase::ProcessOutputsImpl(status);
65+
}
66+
67+
void DoTerminateImpl() override {
68+
this->TaskRunner.Reset();
69+
}
70+
71+
void InvalidateMeminfo() override {
72+
if (this->TaskRunner) {
73+
this->TaskRunner->GetAllocator().InvalidateMemInfo();
74+
this->TaskRunner->GetAllocator().DisableStrictAllocationCheck();
75+
}
76+
}
77+
4778
void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
4879
CA_LOG_D("Save state");
4980
NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
@@ -131,6 +162,15 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
131162
const NYql::NDq::TDqMeteringStats* GetMeteringStats() override {
132163
return this->TaskRunner ? this->TaskRunner->GetMeteringStats() : nullptr;
133164
}
165+
166+
protected:
167+
// methods that are called via static_cast<TDerived*>(this) and may be overriden by a dervied class
168+
void* GetSourcesState() const {
169+
return nullptr;
170+
}
171+
void PollSources(void* /* state */) {
172+
}
173+
134174
};
135175

136176
} //namespace NYql::NDq

0 commit comments

Comments
 (0)