@@ -49,7 +49,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
49
49
auto sourcesState = static_cast <TDerived*>(this )->GetSourcesState ();
50
50
51
51
TBase::PollAsyncInput ();
52
- ERunStatus status = this -> TaskRunner ->Run ();
52
+ ERunStatus status = TaskRunner->Run ();
53
53
54
54
CA_LOG_T (" Resume execution, run status: " << status);
55
55
@@ -65,13 +65,13 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
65
65
}
66
66
67
67
void DoTerminateImpl () override {
68
- this -> TaskRunner .Reset ();
68
+ TaskRunner.Reset ();
69
69
}
70
70
71
71
void InvalidateMeminfo () override {
72
- if (this -> TaskRunner ) {
73
- this -> TaskRunner ->GetAllocator ().InvalidateMemInfo ();
74
- this -> TaskRunner ->GetAllocator ().DisableStrictAllocationCheck ();
72
+ if (TaskRunner) {
73
+ TaskRunner->GetAllocator ().InvalidateMemInfo ();
74
+ TaskRunner->GetAllocator ().DisableStrictAllocationCheck ();
75
75
}
76
76
}
77
77
@@ -81,7 +81,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
81
81
mkqlProgramState.SetRuntimeVersion (NDqProto::RUNTIME_VERSION_YQL_1_0);
82
82
NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData ()->MutableStateData ();
83
83
data.SetVersion (TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
84
- data.SetBlob (this -> TaskRunner ->Save ());
84
+ data.SetBlob (TaskRunner->Save ());
85
85
86
86
for (auto & [inputIndex, source] : this ->SourcesMap ) {
87
87
YQL_ENSURE (source.AsyncInput , " Source[" << inputIndex << " ] is not created" );
@@ -94,19 +94,19 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
94
94
void DoLoadRunnerState (TString&& blob) override {
95
95
TMaybe<TString> error = Nothing ();
96
96
try {
97
- this -> TaskRunner ->Load (blob);
97
+ TaskRunner->Load (blob);
98
98
} catch (const std::exception& e) {
99
99
error = e.what ();
100
100
}
101
101
this ->Checkpoints ->AfterStateLoading (error);
102
102
}
103
103
104
104
void SetTaskRunner (const TIntrusivePtr<IDqTaskRunner>& taskRunner) {
105
- this -> TaskRunner = taskRunner;
105
+ TaskRunner = taskRunner;
106
106
}
107
107
108
108
void PrepareTaskRunner (const IDqTaskRunnerExecutionContext& execCtx) {
109
- YQL_ENSURE (this -> TaskRunner );
109
+ YQL_ENSURE (TaskRunner);
110
110
111
111
auto guard = TBase::BindAllocator ();
112
112
auto * alloc = guard.GetMutex ();
@@ -118,49 +118,49 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
118
118
limits.ChannelBufferSize = this ->MemoryLimits .ChannelBufferSize ;
119
119
limits.OutputChunkMaxSize = GetDqExecutionSettings ().FlowControl .MaxOutputChunkSize ;
120
120
121
- this -> TaskRunner ->Prepare (this ->Task , limits, execCtx);
121
+ TaskRunner->Prepare (this ->Task , limits, execCtx);
122
122
123
123
for (auto & [channelId, channel] : this ->InputChannelsMap ) {
124
- channel.Channel = this -> TaskRunner ->GetInputChannel (channelId);
124
+ channel.Channel = TaskRunner->GetInputChannel (channelId);
125
125
}
126
126
127
127
for (auto & [inputIndex, source] : this ->SourcesMap ) {
128
- source.Buffer = this -> TaskRunner ->GetSource (inputIndex);
128
+ source.Buffer = TaskRunner->GetSource (inputIndex);
129
129
Y_ABORT_UNLESS (source.Buffer );
130
130
}
131
131
132
132
for (auto & [inputIndex, transform] : this ->InputTransformsMap ) {
133
- std::tie (transform.InputBuffer , transform.Buffer ) = this -> TaskRunner ->GetInputTransform (inputIndex);
133
+ std::tie (transform.InputBuffer , transform.Buffer ) = TaskRunner->GetInputTransform (inputIndex);
134
134
}
135
135
136
136
for (auto & [channelId, channel] : this ->OutputChannelsMap ) {
137
- channel.Channel = this -> TaskRunner ->GetOutputChannel (channelId);
137
+ channel.Channel = TaskRunner->GetOutputChannel (channelId);
138
138
}
139
139
140
140
for (auto & [outputIndex, transform] : this ->OutputTransformsMap ) {
141
- std::tie (transform.Buffer , transform.OutputBuffer ) = this -> TaskRunner ->GetOutputTransform (outputIndex);
141
+ std::tie (transform.Buffer , transform.OutputBuffer ) = TaskRunner->GetOutputTransform (outputIndex);
142
142
}
143
143
144
144
for (auto & [outputIndex, sink] : this ->SinksMap ) {
145
- sink.Buffer = this -> TaskRunner ->GetSink (outputIndex);
145
+ sink.Buffer = TaskRunner->GetSink (outputIndex);
146
146
}
147
147
148
148
TBase::FillIoMaps (
149
- this -> TaskRunner ->GetHolderFactory (),
150
- this -> TaskRunner ->GetTypeEnv (),
151
- this -> TaskRunner ->GetSecureParams (),
152
- this -> TaskRunner ->GetTaskParams (),
153
- this -> TaskRunner ->GetReadRanges (),
154
- this -> TaskRunner ->GetRandomProvider ()
149
+ TaskRunner->GetHolderFactory (),
150
+ TaskRunner->GetTypeEnv (),
151
+ TaskRunner->GetSecureParams (),
152
+ TaskRunner->GetTaskParams (),
153
+ TaskRunner->GetReadRanges (),
154
+ TaskRunner->GetRandomProvider ()
155
155
);
156
156
}
157
157
158
158
const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats () override {
159
- return this -> TaskRunner ? this -> TaskRunner ->GetStats () : nullptr ;
159
+ return TaskRunner ? TaskRunner->GetStats () : nullptr ;
160
160
}
161
161
162
162
const NYql::NDq::TDqMeteringStats* GetMeteringStats () override {
163
- return this -> TaskRunner ? this -> TaskRunner ->GetMeteringStats () : nullptr ;
163
+ return TaskRunner ? TaskRunner->GetMeteringStats () : nullptr ;
164
164
}
165
165
166
166
protected:
@@ -171,6 +171,8 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
171
171
void PollSources (void * /* state */ ) {
172
172
}
173
173
174
+ TIntrusivePtr<IDqTaskRunner> TaskRunner;
175
+
174
176
};
175
177
176
178
} // namespace NYql::NDq
0 commit comments