Skip to content

Commit 6e5100e

Browse files
authored
YQ-2068 support input transforms in async Compute Actor (#3459)
1 parent 4bcab84 commit 6e5100e

File tree

9 files changed

+52
-19
lines changed

9 files changed

+52
-19
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
465465
Stat->AddCounters2(ev->Get()->Sensors);
466466
}
467467
TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
468+
for (auto& [inputIndex, transform] : this->InputTransformsMap) {
469+
std::tie(transform.Input, transform.Buffer) = ev->Get()->InputTransforms.at(inputIndex);
470+
}
468471
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr);
469472

470473
{

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
216216
}
217217

218218
for (auto& [inputIndex, transform] : this->InputTransformsMap) {
219-
std::tie(transform.Input, transform.Buffer) = TaskRunner->GetInputTransform(inputIndex);
219+
std::tie(transform.Input, transform.Buffer) = *TaskRunner->GetInputTransform(inputIndex);
220220
}
221221

222222
for (auto& [channelId, channel] : this->OutputChannelsMap) {

ydb/library/yql/dq/actors/task_runner/events.h

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ struct TTaskRunnerEvents {
2828

2929
EvOutputChannelDataRequest,
3030
EvOutputChannelData,
31-
32-
EvInputChannelData,
31+
32+
EvInputChannelData,
3333
EvInputChannelDataAck,
34-
35-
// EvContinueRun -> TaskRunner->Run() -> TEvTaskRunFinished
36-
EvContinueRun,
34+
35+
// EvContinueRun -> TaskRunner->Run() -> TEvTaskRunFinished
36+
EvContinueRun,
3737
EvRunFinished,
3838

3939
EvSourceDataAck,
@@ -171,21 +171,34 @@ struct TEvTaskRunnerCreateFinished
171171
: NActors::TEventLocal<TEvTaskRunnerCreateFinished, TTaskRunnerEvents::EvCreateFinished>
172172
{
173173

174-
TEvTaskRunnerCreateFinished() = default;
175174
TEvTaskRunnerCreateFinished(
176175
const THashMap<TString, TString>& secureParams,
177176
const THashMap<TString, TString>& taskParams,
178177
const TVector<TString>& readRanges,
179178
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
180179
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
181-
const TTaskRunnerActorSensors& sensors = {})
180+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
181+
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>>&& inputTransforms,
182+
const TTaskRunnerActorSensors& sensors = {}
183+
)
182184
: Sensors(sensors)
183185
, SecureParams(secureParams)
184186
, TaskParams(taskParams)
185187
, ReadRanges(readRanges)
186188
, TypeEnv(typeEnv)
187189
, HolderFactory(holderFactory)
188-
{ }
190+
, Alloc(alloc)
191+
, InputTransforms(std::move(inputTransforms))
192+
{
193+
Y_ABORT_UNLESS(inputTransforms.empty() || Alloc);
194+
}
195+
196+
~TEvTaskRunnerCreateFinished() {
197+
if (!InputTransforms.empty()) {
198+
auto guard = Guard(*Alloc);
199+
InputTransforms.clear();
200+
}
201+
}
189202

190203
TTaskRunnerActorSensors Sensors;
191204

@@ -195,6 +208,8 @@ struct TEvTaskRunnerCreateFinished
195208
const TVector<TString>& ReadRanges;
196209
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
197210
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
211+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
212+
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>> InputTransforms; //can'not be const, because we need to explicitly clear it in destructor
198213
};
199214

200215
struct TEvTaskRunFinished

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class TLocalTaskRunnerActor
9595

9696
THashMap<ui32, const IDqAsyncInputBuffer*> inputTransforms;
9797
for (const auto inputTransformId : ev->Get()->InputTransformIds) {
98-
inputTransforms[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId).second.Get();
98+
inputTransforms[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId)->second.Get();
9999
}
100100

101101
ev->Get()->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinks), std::move(inputTransforms));
@@ -231,7 +231,7 @@ class TLocalTaskRunnerActor
231231

232232
THashMap<ui32, const IDqAsyncInputBuffer*> inputTransforms;
233233
for (const auto inputTransformId : st->InputTransformIds) { // TODO
234-
inputTransforms[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId).second.Get();
234+
inputTransforms[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId)->second.Get();
235235
}
236236

237237
st->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinks), std::move(inputTransforms));
@@ -427,6 +427,14 @@ class TLocalTaskRunnerActor
427427
}
428428

429429
TaskRunner->Prepare(settings, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx);
430+
431+
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>> inputTransforms;
432+
for (auto i = 0; i != inputs.size(); ++i) {
433+
if (auto t = TaskRunner->GetInputTransform(i)) {
434+
inputTransforms[i] = *t;
435+
}
436+
}
437+
430438
auto wakeUpCallback = ev->Get()->ExecCtx->GetWakeupCallback();
431439
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(TxId, NActors::TActivationContext::ActorSystem(), wakeUpCallback));
432440

@@ -435,7 +443,10 @@ class TLocalTaskRunnerActor
435443
TaskRunner->GetTaskParams(),
436444
TaskRunner->GetReadRanges(),
437445
TaskRunner->GetTypeEnv(),
438-
TaskRunner->GetHolderFactory());
446+
TaskRunner->GetHolderFactory(),
447+
Alloc,
448+
std::move(inputTransforms)
449+
);
439450

440451
Send(
441452
ParentId,

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -779,10 +779,12 @@ class TDqTaskRunner : public IDqTaskRunner {
779779
return *ptr;
780780
}
781781

782-
std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr> GetInputTransform(ui64 inputIndex) override {
783-
auto ptr = AllocatedHolder->InputTransforms.FindPtr(inputIndex);
784-
YQL_ENSURE(ptr, "task: " << TaskId << " does not have input index: " << inputIndex << " or such transform");
785-
return {ptr->TransformInput, ptr->TransformOutput};
782+
std::optional<std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>> GetInputTransform(ui64 inputIndex) override {
783+
if (auto ptr = AllocatedHolder->InputTransforms.FindPtr(inputIndex)) {
784+
return {std::pair{ptr->TransformInput, ptr->TransformOutput}};
785+
} else {
786+
return std::nullopt;
787+
}
786788
}
787789

788790
std::pair<IDqAsyncOutputBuffer::TPtr, IDqOutputConsumer::TPtr> GetOutputTransform(ui64 outputIndex) override {

ydb/library/yql/dq/runtime/dq_tasks_runner.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ class IDqTaskRunner : public TSimpleRefCount<IDqTaskRunner>, private TNonCopyabl
373373
virtual IDqAsyncInputBuffer::TPtr GetSource(ui64 inputIndex) = 0;
374374
virtual IDqOutputChannel::TPtr GetOutputChannel(ui64 channelId) = 0;
375375
virtual IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) = 0;
376-
virtual std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr> GetInputTransform(ui64 inputIndex) = 0;
376+
virtual std::optional<std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>> GetInputTransform(ui64 inputIndex) = 0;
377377
virtual std::pair<IDqAsyncOutputBuffer::TPtr, IDqOutputConsumer::TPtr> GetOutputTransform(ui64 outputIndex) = 0;
378378

379379
virtual IRandomProvider* GetRandomProvider() const = 0;

ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1715,7 +1715,7 @@ class TDqTaskRunner: public NDq::IDqTaskRunner {
17151715
return sink;
17161716
}
17171717

1718-
std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr> GetInputTransform(ui64 /*inputIndex*/) override {
1718+
std::optional<std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>> GetInputTransform(ui64 /*inputIndex*/) override {
17191719
return {};
17201720
}
17211721

ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,8 @@ class TTaskRunnerActor
636636
taskRunner->GetReadRanges(),
637637
taskRunner->GetTypeEnv(),
638638
taskRunner->GetHolderFactory(),
639+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc>{},
640+
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>>{},
639641
sensors);
640642

641643
actorSystem->Send(

ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,7 @@ class TYtFileGateway : public IYtGateway {
759759
writer.SetSpecs(spec);
760760

761761
TStringStream err;
762-
auto type = BuildType(*tableInfo.RowSpec->GetType(), typeBuilder, err);//
762+
auto type = BuildType(*tableInfo.RowSpec->GetType(), typeBuilder, err);
763763
TValuePacker packer(true, type);
764764
for (auto& c: content) {
765765
auto val = packer.Unpack(c, holderFactory);

0 commit comments

Comments
 (0)