@@ -49,7 +49,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
4949 auto sourcesState = static_cast <TDerived*>(this )->GetSourcesState ();
5050
5151 TBase::PollAsyncInput ();
52- ERunStatus status = this -> TaskRunner ->Run ();
52+ ERunStatus status = TaskRunner->Run ();
5353
5454 CA_LOG_T (" Resume execution, run status: " << status);
5555
@@ -65,13 +65,13 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
6565 }
6666
6767 void DoTerminateImpl () override {
68- this -> TaskRunner .Reset ();
68+ TaskRunner.Reset ();
6969 }
7070
7171 void InvalidateMeminfo () override {
72- if (this -> TaskRunner ) {
73- this -> TaskRunner ->GetAllocator ().InvalidateMemInfo ();
74- this -> TaskRunner ->GetAllocator ().DisableStrictAllocationCheck ();
72+ if (TaskRunner) {
73+ TaskRunner->GetAllocator ().InvalidateMemInfo ();
74+ TaskRunner->GetAllocator ().DisableStrictAllocationCheck ();
7575 }
7676 }
7777
@@ -81,7 +81,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
8181 mkqlProgramState.SetRuntimeVersion (NDqProto::RUNTIME_VERSION_YQL_1_0);
8282 NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData ()->MutableStateData ();
8383 data.SetVersion (TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
84- data.SetBlob (this -> TaskRunner ->Save ());
84+ data.SetBlob (TaskRunner->Save ());
8585
8686 for (auto & [inputIndex, source] : this ->SourcesMap ) {
8787 YQL_ENSURE (source.AsyncInput , " Source[" << inputIndex << " ] is not created" );
@@ -94,19 +94,19 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
9494 void DoLoadRunnerState (TString&& blob) override {
9595 TMaybe<TString> error = Nothing ();
9696 try {
97- this -> TaskRunner ->Load (blob);
97+ TaskRunner->Load (blob);
9898 } catch (const std::exception& e) {
9999 error = e.what ();
100100 }
101101 this ->Checkpoints ->AfterStateLoading (error);
102102 }
103103
104104 void SetTaskRunner (const TIntrusivePtr<IDqTaskRunner>& taskRunner) {
105- this -> TaskRunner = taskRunner;
105+ TaskRunner = taskRunner;
106106 }
107107
108108 void PrepareTaskRunner (const IDqTaskRunnerExecutionContext& execCtx) {
109- YQL_ENSURE (this -> TaskRunner );
109+ YQL_ENSURE (TaskRunner);
110110
111111 auto guard = TBase::BindAllocator ();
112112 auto * alloc = guard.GetMutex ();
@@ -118,49 +118,49 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
118118 limits.ChannelBufferSize = this ->MemoryLimits .ChannelBufferSize ;
119119 limits.OutputChunkMaxSize = GetDqExecutionSettings ().FlowControl .MaxOutputChunkSize ;
120120
121- this -> TaskRunner ->Prepare (this ->Task , limits, execCtx);
121+ TaskRunner->Prepare (this ->Task , limits, execCtx);
122122
123123 for (auto & [channelId, channel] : this ->InputChannelsMap ) {
124- channel.Channel = this -> TaskRunner ->GetInputChannel (channelId);
124+ channel.Channel = TaskRunner->GetInputChannel (channelId);
125125 }
126126
127127 for (auto & [inputIndex, source] : this ->SourcesMap ) {
128- source.Buffer = this -> TaskRunner ->GetSource (inputIndex);
128+ source.Buffer = TaskRunner->GetSource (inputIndex);
129129 Y_ABORT_UNLESS (source.Buffer );
130130 }
131131
132132 for (auto & [inputIndex, transform] : this ->InputTransformsMap ) {
133- std::tie (transform.InputBuffer , transform.Buffer ) = this -> TaskRunner ->GetInputTransform (inputIndex);
133+ std::tie (transform.InputBuffer , transform.Buffer ) = TaskRunner->GetInputTransform (inputIndex);
134134 }
135135
136136 for (auto & [channelId, channel] : this ->OutputChannelsMap ) {
137- channel.Channel = this -> TaskRunner ->GetOutputChannel (channelId);
137+ channel.Channel = TaskRunner->GetOutputChannel (channelId);
138138 }
139139
140140 for (auto & [outputIndex, transform] : this ->OutputTransformsMap ) {
141- std::tie (transform.Buffer , transform.OutputBuffer ) = this -> TaskRunner ->GetOutputTransform (outputIndex);
141+ std::tie (transform.Buffer , transform.OutputBuffer ) = TaskRunner->GetOutputTransform (outputIndex);
142142 }
143143
144144 for (auto & [outputIndex, sink] : this ->SinksMap ) {
145- sink.Buffer = this -> TaskRunner ->GetSink (outputIndex);
145+ sink.Buffer = TaskRunner->GetSink (outputIndex);
146146 }
147147
148148 TBase::FillIoMaps (
149- this -> TaskRunner ->GetHolderFactory (),
150- this -> TaskRunner ->GetTypeEnv (),
151- this -> TaskRunner ->GetSecureParams (),
152- this -> TaskRunner ->GetTaskParams (),
153- this -> TaskRunner ->GetReadRanges (),
154- this -> TaskRunner ->GetRandomProvider ()
149+ TaskRunner->GetHolderFactory (),
150+ TaskRunner->GetTypeEnv (),
151+ TaskRunner->GetSecureParams (),
152+ TaskRunner->GetTaskParams (),
153+ TaskRunner->GetReadRanges (),
154+ TaskRunner->GetRandomProvider ()
155155 );
156156 }
157157
158158 const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats () override {
159- return this -> TaskRunner ? this -> TaskRunner ->GetStats () : nullptr ;
159+ return TaskRunner ? TaskRunner->GetStats () : nullptr ;
160160 }
161161
162162 const NYql::NDq::TDqMeteringStats* GetMeteringStats () override {
163- return this -> TaskRunner ? this -> TaskRunner ->GetMeteringStats () : nullptr ;
163+ return TaskRunner ? TaskRunner->GetMeteringStats () : nullptr ;
164164 }
165165
166166protected:
@@ -171,6 +171,8 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
171171 void PollSources (void * /* state */ ) {
172172 }
173173
174+ TIntrusivePtr<IDqTaskRunner> TaskRunner;
175+
174176};
175177
176178} // namespace NYql::NDq
0 commit comments