Skip to content

Commit 3e379b3

Browse files
authored
Merge 0ba4810 into fb4d6bb
2 parents fb4d6bb + 0ba4810 commit 3e379b3

24 files changed

+240
-47
lines changed

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -721,14 +721,15 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
721721
const auto& readRanges = ev->Get()->ReadRanges;
722722
const auto& typeEnv = ev->Get()->TypeEnv;
723723
const auto& holderFactory = ev->Get()->HolderFactory;
724+
const auto& memoryLimits = ev->Get()->MemoryLimits;
724725
if (Stat) {
725726
Stat->AddCounters2(ev->Get()->Sensors);
726727
}
727728
TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
728729
for (auto& [inputIndex, transform] : this->InputTransformsMap) {
729730
std::tie(transform.Input, transform.Buffer) = ev->Get()->InputTransforms.at(inputIndex);
730731
}
731-
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr);
732+
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, memoryLimits, nullptr);
732733

733734
{
734735
// say "Hello" to executer

ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class TTaskOutput;
2222
namespace NYql::NDq {
2323
struct TSourceState;
2424
struct TSinkState;
25+
struct TDqTaskRunnerMemoryLimits;
2526
} // namespace NYql::NDq
2627

2728
namespace NActors {
@@ -262,6 +263,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
262263
const google::protobuf::Message* SourceSettings = nullptr; // used only in case if we execute compute actor locally
263264
TIntrusivePtr<NActors::TProtoArenaHolder> Arena; // Arena for SourceSettings
264265
NWilson::TTraceId TraceId;
266+
const TDqTaskRunnerMemoryLimits& MemoryLimits;
265267
};
266268

267269
struct TLookupSourceArguments {

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1338,6 +1338,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
13381338
const THashMap<TString, TString>& secureParams,
13391339
const THashMap<TString, TString>& taskParams,
13401340
const TVector<TString>& readRanges,
1341+
const TDqTaskRunnerMemoryLimits& memoryLimits,
13411342
IRandomProvider* randomProvider
13421343
)
13431344
{
@@ -1371,7 +1372,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
13711372
.MemoryQuotaManager = MemoryLimits.MemoryQuotaManager,
13721373
.SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr),
13731374
.Arena = Task.GetArena(),
1374-
.TraceId = ComputeActorSpan.GetTraceId()
1375+
.TraceId = ComputeActorSpan.GetTraceId(),
1376+
.MemoryLimits = memoryLimits
13751377
});
13761378
} catch (const std::exception& ex) {
13771379
throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what();

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
249249
TaskRunner->GetSecureParams(),
250250
TaskRunner->GetTaskParams(),
251251
TaskRunner->GetReadRanges(),
252+
TaskRunner->GetMemoryLimits(),
252253
TaskRunner->GetRandomProvider()
253254
);
254255
}

ydb/library/yql/dq/actors/task_runner/events.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ struct TEvTaskRunnerCreateFinished
179179
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
180180
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
181181
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>>&& inputTransforms,
182+
const TDqTaskRunnerMemoryLimits& memoryLimits,
182183
const TTaskRunnerActorSensors& sensors = {}
183184
)
184185
: Sensors(sensors)
@@ -189,6 +190,7 @@ struct TEvTaskRunnerCreateFinished
189190
, HolderFactory(holderFactory)
190191
, Alloc(alloc)
191192
, InputTransforms(std::move(inputTransforms))
193+
, MemoryLimits(memoryLimits)
192194
{
193195
Y_ABORT_UNLESS(inputTransforms.empty() || Alloc);
194196
}
@@ -210,6 +212,7 @@ struct TEvTaskRunnerCreateFinished
210212
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
211213
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
212214
THashMap<ui64, std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr>> InputTransforms; //can'not be const, because we need to explicitly clear it in destructor
215+
const TDqTaskRunnerMemoryLimits MemoryLimits;
213216
};
214217

215218
struct TEvTaskRunFinished

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,8 @@ class TLocalTaskRunnerActor
467467
TaskRunner->GetTypeEnv(),
468468
TaskRunner->GetHolderFactory(),
469469
Alloc,
470-
std::move(inputTransforms)
470+
std::move(inputTransforms),
471+
TaskRunner->GetMemoryLimits()
471472
);
472473

473474
Send(

ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,7 @@ class TDqTaskRunner : public IDqTaskRunner {
539539
void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
540540
const IDqTaskRunnerExecutionContext& execCtx) override
541541
{
542+
MemoryLimits = memoryLimits;
542543
TaskId = task.GetId();
543544
auto entry = BuildTask(task);
544545

@@ -901,6 +902,10 @@ class TDqTaskRunner : public IDqTaskRunner {
901902
return Stats.get();
902903
}
903904

905+
const TDqTaskRunnerMemoryLimits& GetMemoryLimits() const override {
906+
return MemoryLimits;
907+
}
908+
904909
TString Save() const override {
905910
return AllocatedHolder->ProgramParsed.CompGraph->SaveGraphState();
906911
}
@@ -998,6 +1003,7 @@ class TDqTaskRunner : public IDqTaskRunner {
9981003
ui64 TaskId = 0;
9991004
TDqTaskRunnerContext Context;
10001005
TDqTaskRunnerSettings Settings;
1006+
TDqTaskRunnerMemoryLimits MemoryLimits;
10011007
TLogFunc LogFunc;
10021008
std::unique_ptr<NUdf::ISecureParamsProvider> SecureParamsProvider;
10031009
TDqTaskCountersProvider CountersProvider;

ydb/library/yql/dq/runtime/dq_tasks_runner.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ class IDqTaskRunner : public TSimpleRefCount<IDqTaskRunner>, private TNonCopyabl
437437
virtual const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const = 0;
438438
virtual const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const = 0;
439439
virtual NKikimr::NMiniKQL::TScopedAlloc& GetAllocator() const = 0;
440+
virtual const TDqTaskRunnerMemoryLimits& GetMemoryLimits() const = 0;
440441

441442
virtual const THashMap<TString, TString>& GetSecureParams() const = 0;
442443
virtual const THashMap<TString, TString>& GetTaskParams() const = 0;

ydb/library/yql/providers/dq/actors/worker_actor.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ class TDqWorker: public TRichActor<TDqWorker>
300300
const auto& readRanges = ev->Get()->ReadRanges;
301301
const auto& typeEnv = ev->Get()->TypeEnv;
302302
const auto& holderFactory = ev->Get()->HolderFactory;
303+
const auto& memoryLimits = ev->Get()->MemoryLimits;
303304

304305
Stat.Measure<void>("PrepareChannels", [&](){
305306
auto& inputs = Task.GetInputs();
@@ -323,7 +324,8 @@ class TDqWorker: public TRichActor<TDqWorker>
323324
.TypeEnv = typeEnv,
324325
.HolderFactory = holderFactory,
325326
.ProgramBuilder = *source.ProgramBuilder,
326-
.MemoryQuotaManager = MemoryQuotaManager
327+
.MemoryQuotaManager = MemoryQuotaManager,
328+
.MemoryLimits = memoryLimits
327329
});
328330
RegisterLocalChild(source.Actor);
329331
} else {

ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ class TLocalTaskRunner: public ITaskRunner {
181181
return {0, ""};
182182
}
183183

184+
const NDq::TDqTaskRunnerMemoryLimits& GetMemoryLimits() const override {
185+
return Runner->GetMemoryLimits();
186+
}
187+
184188
private:
185189
void UpdateStats() {
186190
QueryStat.AddTaskRunnerStats(*Runner->GetStats(), Task.GetId(), Task.GetStageId());

0 commit comments

Comments
 (0)