Skip to content

Commit ea7f387

Browse files
authored
Merge d613e11 into 002f1f3
2 parents 002f1f3 + d613e11 commit ea7f387

File tree

2 files changed

+50
-52
lines changed

2 files changed

+50
-52
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,33 +1458,32 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
14581458
this->RegisterWithSameMailbox(source.Actor);
14591459
}
14601460
for (auto& [inputIndex, transform] : InputTransformsMap) {
1461-
Y_ABORT_UNLESS(TaskRunner);
1462-
transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry);
1463-
Y_ABORT_UNLESS(AsyncIoFactory);
1464-
const auto& inputDesc = Task.GetInputs(inputIndex);
1465-
CA_LOG_D("Create transform for input " << inputIndex << " " << inputDesc.ShortDebugString());
1466-
try {
1467-
std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform(
1468-
IDqAsyncIoFactory::TInputTransformArguments {
1469-
.InputDesc = inputDesc,
1470-
.InputIndex = inputIndex,
1471-
.StatsLevel = collectStatsLevel,
1472-
.TxId = TxId,
1473-
.TaskId = Task.GetId(),
1474-
.TransformInput = transform.InputBuffer,
1475-
.SecureParams = secureParams,
1476-
.TaskParams = taskParams,
1477-
.ComputeActorId = this->SelfId(),
1478-
.TypeEnv = typeEnv,
1479-
.HolderFactory = holderFactory,
1480-
.ProgramBuilder = *transform.ProgramBuilder,
1481-
.Alloc = Alloc,
1482-
.TraceId = ComputeActorSpan.GetTraceId()
1483-
});
1484-
} catch (const std::exception& ex) {
1485-
throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what();
1486-
}
1487-
this->RegisterWithSameMailbox(transform.Actor);
1461+
transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry);
1462+
Y_ABORT_UNLESS(AsyncIoFactory);
1463+
const auto& inputDesc = Task.GetInputs(inputIndex);
1464+
CA_LOG_D("Create transform for input " << inputIndex << " " << inputDesc.ShortDebugString());
1465+
try {
1466+
std::tie(transform.AsyncInput, transform.Actor) = AsyncIoFactory->CreateDqInputTransform(
1467+
IDqAsyncIoFactory::TInputTransformArguments {
1468+
.InputDesc = inputDesc,
1469+
.InputIndex = inputIndex,
1470+
.StatsLevel = collectStatsLevel,
1471+
.TxId = TxId,
1472+
.TaskId = Task.GetId(),
1473+
.TransformInput = transform.InputBuffer,
1474+
.SecureParams = secureParams,
1475+
.TaskParams = taskParams,
1476+
.ComputeActorId = this->SelfId(),
1477+
.TypeEnv = typeEnv,
1478+
.HolderFactory = holderFactory,
1479+
.ProgramBuilder = *transform.ProgramBuilder,
1480+
.Alloc = Alloc,
1481+
.TraceId = ComputeActorSpan.GetTraceId()
1482+
});
1483+
} catch (const std::exception& ex) {
1484+
throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what();
1485+
}
1486+
this->RegisterWithSameMailbox(transform.Actor);
14881487
}
14891488
for (auto& [outputIndex, transform] : OutputTransformsMap) {
14901489
transform.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry);
@@ -2031,7 +2030,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
20312030
const IDqAsyncIoFactory::TPtr AsyncIoFactory;
20322031
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
20332032
const NDqProto::ECheckpointingMode CheckpointingMode;
2034-
TIntrusivePtr<IDqTaskRunner> TaskRunner;
20352033
TDqComputeActorChannels* Channels = nullptr;
20362034
TDqComputeActorCheckpoints* Checkpoints = nullptr;
20372035
THashMap<ui64, TInputChannelInfo> InputChannelsMap; // Channel id -> Channel info

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
4949
auto sourcesState = static_cast<TDerived*>(this)->GetSourcesState();
5050

5151
TBase::PollAsyncInput();
52-
ERunStatus status = this->TaskRunner->Run();
52+
ERunStatus status = TaskRunner->Run();
5353

5454
CA_LOG_T("Resume execution, run status: " << status);
5555

@@ -65,13 +65,13 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
6565
}
6666

6767
void DoTerminateImpl() override {
68-
this->TaskRunner.Reset();
68+
TaskRunner.Reset();
6969
}
7070

7171
void InvalidateMeminfo() override {
72-
if (this->TaskRunner) {
73-
this->TaskRunner->GetAllocator().InvalidateMemInfo();
74-
this->TaskRunner->GetAllocator().DisableStrictAllocationCheck();
72+
if (TaskRunner) {
73+
TaskRunner->GetAllocator().InvalidateMemInfo();
74+
TaskRunner->GetAllocator().DisableStrictAllocationCheck();
7575
}
7676
}
7777

@@ -81,7 +81,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
8181
mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
8282
NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData();
8383
data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
84-
data.SetBlob(this->TaskRunner->Save());
84+
data.SetBlob(TaskRunner->Save());
8585

8686
for (auto& [inputIndex, source] : this->SourcesMap) {
8787
YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
@@ -94,19 +94,19 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
9494
void DoLoadRunnerState(TString&& blob) override {
9595
TMaybe<TString> error = Nothing();
9696
try {
97-
this->TaskRunner->Load(blob);
97+
TaskRunner->Load(blob);
9898
} catch (const std::exception& e) {
9999
error = e.what();
100100
}
101101
this->Checkpoints->AfterStateLoading(error);
102102
}
103103

104104
void SetTaskRunner(const TIntrusivePtr<IDqTaskRunner>& taskRunner) {
105-
this->TaskRunner = taskRunner;
105+
TaskRunner = taskRunner;
106106
}
107107

108108
void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx) {
109-
YQL_ENSURE(this->TaskRunner);
109+
YQL_ENSURE(TaskRunner);
110110

111111
auto guard = TBase::BindAllocator();
112112
auto* alloc = guard.GetMutex();
@@ -118,49 +118,49 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
118118
limits.ChannelBufferSize = this->MemoryLimits.ChannelBufferSize;
119119
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
120120

121-
this->TaskRunner->Prepare(this->Task, limits, execCtx);
121+
TaskRunner->Prepare(this->Task, limits, execCtx);
122122

123123
for (auto& [channelId, channel] : this->InputChannelsMap) {
124-
channel.Channel = this->TaskRunner->GetInputChannel(channelId);
124+
channel.Channel = TaskRunner->GetInputChannel(channelId);
125125
}
126126

127127
for (auto& [inputIndex, source] : this->SourcesMap) {
128-
source.Buffer = this->TaskRunner->GetSource(inputIndex);
128+
source.Buffer = TaskRunner->GetSource(inputIndex);
129129
Y_ABORT_UNLESS(source.Buffer);
130130
}
131131

132132
for (auto& [inputIndex, transform] : this->InputTransformsMap) {
133-
std::tie(transform.InputBuffer, transform.Buffer) = this->TaskRunner->GetInputTransform(inputIndex);
133+
std::tie(transform.InputBuffer, transform.Buffer) = TaskRunner->GetInputTransform(inputIndex);
134134
}
135135

136136
for (auto& [channelId, channel] : this->OutputChannelsMap) {
137-
channel.Channel = this->TaskRunner->GetOutputChannel(channelId);
137+
channel.Channel = TaskRunner->GetOutputChannel(channelId);
138138
}
139139

140140
for (auto& [outputIndex, transform] : this->OutputTransformsMap) {
141-
std::tie(transform.Buffer, transform.OutputBuffer) = this->TaskRunner->GetOutputTransform(outputIndex);
141+
std::tie(transform.Buffer, transform.OutputBuffer) = TaskRunner->GetOutputTransform(outputIndex);
142142
}
143143

144144
for (auto& [outputIndex, sink] : this->SinksMap) {
145-
sink.Buffer = this->TaskRunner->GetSink(outputIndex);
145+
sink.Buffer = TaskRunner->GetSink(outputIndex);
146146
}
147147

148148
TBase::FillIoMaps(
149-
this->TaskRunner->GetHolderFactory(),
150-
this->TaskRunner->GetTypeEnv(),
151-
this->TaskRunner->GetSecureParams(),
152-
this->TaskRunner->GetTaskParams(),
153-
this->TaskRunner->GetReadRanges(),
154-
this->TaskRunner->GetRandomProvider()
149+
TaskRunner->GetHolderFactory(),
150+
TaskRunner->GetTypeEnv(),
151+
TaskRunner->GetSecureParams(),
152+
TaskRunner->GetTaskParams(),
153+
TaskRunner->GetReadRanges(),
154+
TaskRunner->GetRandomProvider()
155155
);
156156
}
157157

158158
const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() override {
159-
return this->TaskRunner ? this->TaskRunner->GetStats() : nullptr;
159+
return TaskRunner ? TaskRunner->GetStats() : nullptr;
160160
}
161161

162162
const NYql::NDq::TDqMeteringStats* GetMeteringStats() override {
163-
return this->TaskRunner ? this->TaskRunner->GetMeteringStats() : nullptr;
163+
return TaskRunner ? TaskRunner->GetMeteringStats() : nullptr;
164164
}
165165

166166
protected:

0 commit comments

Comments
 (0)