Skip to content

Commit 6e162f1

Browse files
committed
YQL-17542 move more methods to TDqSyncComputeActorBase
1 parent 83b8570 commit 6e162f1

File tree

2 files changed

+52
-46
lines changed

2 files changed

+52
-46
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -310,24 +310,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
310310
}
311311
}
312312

313-
virtual void DoExecuteImpl() {
314-
auto sourcesState = GetSourcesState();
315-
316-
PollAsyncInput();
317-
ERunStatus status = TaskRunner->Run();
318-
319-
CA_LOG_T("Resume execution, run status: " << status);
320-
321-
if (status != ERunStatus::Finished) {
322-
PollSources(std::move(sourcesState));
323-
}
324-
325-
if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
326-
Checkpoints->DoCheckpoint();
327-
}
328-
329-
ProcessOutputsImpl(status);
330-
}
313+
virtual void DoExecuteImpl() = 0;
331314

332315
virtual bool DoHandleChannelsAfterFinishImpl() {
333316
Y_ABORT_UNLESS(Checkpoints);
@@ -741,22 +724,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
741724
return true;
742725
}
743726

744-
void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
745-
CA_LOG_D("Save state");
746-
NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
747-
mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
748-
NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData();
749-
data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
750-
data.SetBlob(TaskRunner->Save());
751-
752-
for (auto& [inputIndex, source] : SourcesMap) {
753-
YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
754-
NDqProto::TSourceState& sourceState = *state.AddSources();
755-
source.AsyncInput->SaveState(checkpoint, sourceState);
756-
sourceState.SetInputIndex(inputIndex);
757-
}
758-
}
759-
760727
void CommitState(const NDqProto::TCheckpoint& checkpoint) override {
761728
CA_LOG_D("Commit state");
762729
for (auto& [inputIndex, source] : SourcesMap) {
@@ -810,15 +777,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
810777
}
811778
}
812779

813-
virtual void DoLoadRunnerState(TString&& blob) {
814-
TMaybe<TString> error = Nothing();
815-
try {
816-
TaskRunner->Load(blob);
817-
} catch (const std::exception& e) {
818-
error = e.what();
819-
}
820-
Checkpoints->AfterStateLoading(error);
821-
}
780+
virtual void DoLoadRunnerState(TString&& blob) = 0;
822781

823782
void LoadState(NDqProto::TComputeActorState&& state) override {
824783
CA_LOG_D("Load state");
@@ -1804,9 +1763,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
18041763
}
18051764
}
18061765

1807-
virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() {
1808-
return TaskRunner ? TaskRunner->GetStats() : nullptr;
1809-
}
1766+
virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() = 0;
18101767

18111768
virtual const IDqAsyncOutputBuffer* GetSink(ui64, const TAsyncOutputInfoBase& sinkInfo) const {
18121769
return sinkInfo.Buffer.Get();

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,51 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
4444
return inputTransformInfo.Buffer.Get();
4545
}
4646
protected:
47+
void DoExecuteImpl() override {
48+
auto sourcesState = TBase::GetSourcesState();
49+
50+
TBase::PollAsyncInput();
51+
ERunStatus status = this->TaskRunner->Run();
52+
53+
CA_LOG_T("Resume execution, run status: " << status);
54+
55+
if (status != ERunStatus::Finished) {
56+
TBase::PollSources(std::move(sourcesState));
57+
}
58+
59+
if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && this->Checkpoints && this->Checkpoints->HasPendingCheckpoint() && !this->Checkpoints->ComputeActorStateSaved() && TBase::ReadyToCheckpoint()) {
60+
this->Checkpoints->DoCheckpoint();
61+
}
62+
63+
this->ProcessOutputsImpl(status);
64+
}
65+
66+
void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
67+
CA_LOG_D("Save state");
68+
NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
69+
mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
70+
NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData();
71+
data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
72+
data.SetBlob(this->TaskRunner->Save());
73+
74+
for (auto& [inputIndex, source] : this->SourcesMap) {
75+
YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
76+
NDqProto::TSourceState& sourceState = *state.AddSources();
77+
source.AsyncInput->SaveState(checkpoint, sourceState);
78+
sourceState.SetInputIndex(inputIndex);
79+
}
80+
}
81+
82+
void DoLoadRunnerState(TString&& blob) override {
83+
TMaybe<TString> error = Nothing();
84+
try {
85+
this->TaskRunner->Load(blob);
86+
} catch (const std::exception& e) {
87+
error = e.what();
88+
}
89+
this->Checkpoints->AfterStateLoading(error);
90+
}
91+
4792
void SetTaskRunner(const TIntrusivePtr<IDqTaskRunner>& taskRunner) {
4893
this->TaskRunner = taskRunner;
4994
}
@@ -69,6 +114,10 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
69114
this->TaskRunner->GetTaskParams(),
70115
this->TaskRunner->GetReadRanges());
71116
}
117+
118+
const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() override {
119+
return this->TaskRunner ? this->TaskRunner->GetStats() : nullptr;
120+
}
72121
};
73122

74123
} //namespace NYql::NDq

0 commit comments

Comments
 (0)