Skip to content

Commit d80571e

Browse files
committed
YQL-18314 use shared alloc in all task runners
1 parent 22f57bd commit d80571e

25 files changed

+50
-53
lines changed

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ void TKqpComputeActor::DoBootstrap() {
6969
settings.ReadRanges.push_back(readRange);
7070
}
7171

72-
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocator(), execCtx, settings, logger);
72+
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
7373
SetTaskRunner(taskRunner);
7474

7575
auto wakeup = [this]{ ContinueExecute(); };

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ void TKqpScanComputeActor::DoBootstrap() {
216216
};
217217
}
218218

219-
auto taskRunner = MakeDqTaskRunner(GetAllocator(), execCtx, settings, logger);
219+
auto taskRunner = MakeDqTaskRunner(GetAllocatorPtr(), execCtx, settings, logger);
220220
TBase::SetTaskRunner(taskRunner);
221221

222222
auto wakeup = [this] { ContinueExecute(); };

ydb/core/kqp/executer_actor/kqp_executer_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch
4949

5050
TEvKqpExecuter::TEvTxResponse::~TEvTxResponse() {
5151
if (!TxResults.empty() && Y_LIKELY(AllocState)) {
52-
with_lock(AllocState->Alloc) {
52+
with_lock(*AllocState->Alloc) {
5353
TxResults.crop(0);
5454
}
5555
}

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
166166
TString BuildMemoryLimitExceptionMessage() const {
167167
if (Request.TxAlloc) {
168168
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
169-
<< ", current limit is " << Request.TxAlloc->Alloc.GetLimit() << " bytes.";
169+
<< ", current limit is " << Request.TxAlloc->Alloc->GetLimit() << " bytes.";
170170
}
171171
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
172172
}

ydb/core/kqp/executer_actor/kqp_literal_executer.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class TKqpLiteralExecuter {
166166
UpdateCounters();
167167
}
168168

169-
void RunTask(NMiniKQL::TScopedAlloc& alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
169+
void RunTask(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
170170
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
171171
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
172172

@@ -211,7 +211,7 @@ class TKqpLiteralExecuter {
211211
auto status = taskRunner->Run();
212212
YQL_ENSURE(status == ERunStatus::Finished);
213213

214-
with_lock (alloc) { // allocator is used only by outputChannel->PopAll()
214+
with_lock (*alloc) { // allocator is used only by outputChannel->PopAll()
215215
for (auto& taskOutput : task.Outputs) {
216216
for (ui64 outputChannelId : taskOutput.Channels) {
217217
auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);
@@ -276,7 +276,7 @@ class TKqpLiteralExecuter {
276276

277277
private:
278278
void CleanupCtx() {
279-
with_lock(Request.TxAlloc->Alloc) {
279+
with_lock(*Request.TxAlloc->Alloc) {
280280
TaskRunners.erase(TaskRunners.begin(), TaskRunners.end());
281281
Request.Transactions.erase(Request.Transactions.begin(), Request.Transactions.end());
282282
ComputeCtx.reset();

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
577577
return peepHoleStatus;
578578
}
579579

580-
auto guard = Guard(SessionCtx->Query().QueryData->GetAllocState()->Alloc);
580+
auto guard = Guard(*SessionCtx->Query().QueryData->GetAllocState()->Alloc);
581581

582582
auto input = Build<TDqPhyStage>(ctx, pos)
583583
.Inputs()

ydb/core/kqp/query_data/kqp_query_data.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,17 @@ void TKqpExecuterTxResult::FillYdb(Ydb::ResultSet* ydbResult, TMaybe<ui64> rowsL
117117

118118
TTxAllocatorState::TTxAllocatorState(const IFunctionRegistry* functionRegistry,
119119
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider)
120-
: Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators())
121-
, TypeEnv(Alloc)
120+
: Alloc(std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators(), false))
121+
, TypeEnv(*Alloc)
122122
, MemInfo("TQueryData")
123-
, HolderFactory(Alloc.Ref(), MemInfo, functionRegistry)
123+
, HolderFactory(Alloc->Ref(), MemInfo, functionRegistry)
124124
{
125-
Alloc.Release();
126125
TimeProvider = timeProvider;
127126
RandomProvider = randomProvider;
128127
}
129128

130129
TTxAllocatorState::~TTxAllocatorState()
131130
{
132-
Alloc.Acquire();
133131
}
134132

135133
std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> TTxAllocatorState::GetInternalBindingValue(

ydb/core/kqp/query_data/kqp_query_data.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ struct TTimeAndRandomProvider {
166166

167167
class TTxAllocatorState: public TTimeAndRandomProvider {
168168
public:
169-
NKikimr::NMiniKQL::TScopedAlloc Alloc;
169+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
170170
NKikimr::NMiniKQL::TTypeEnvironment TypeEnv;
171171
NKikimr::NMiniKQL::TMemoryUsageInfo MemInfo;
172172
NKikimr::NMiniKQL::THolderFactory HolderFactory;

ydb/core/kqp/runtime/kqp_tasks_runner.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp
7070

7171

7272
TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
73-
NKikimr::NMiniKQL::TScopedAlloc& alloc,
73+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
7474
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
7575
: LogFunc(logFunc)
7676
, Alloc(alloc)
@@ -230,13 +230,13 @@ const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const {
230230

231231
TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) {
232232
if (memoryLimit) {
233-
Alloc.SetLimit(*memoryLimit);
233+
Alloc->SetLimit(*memoryLimit);
234234
}
235-
return TGuard(Alloc);
235+
return TGuard(*Alloc);
236236
}
237237

238238
TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
239-
NKikimr::NMiniKQL::TScopedAlloc& alloc,
239+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
240240
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
241241
{
242242
return new TKqpTasksRunner(std::move(tasks), alloc, execCtx, settings, logFunc);

ydb/core/kqp/runtime/kqp_tasks_runner.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::
1616
class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable {
1717
public:
1818
TKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
19-
NKikimr::NMiniKQL::TScopedAlloc& alloc,
19+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
2020
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
2121
const NYql::NDq::TLogFunc& logFunc);
2222

@@ -51,15 +51,15 @@ class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCop
5151
// otherwise use particular memory limit
5252
TGuard<NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = Nothing());
5353

54-
ui64 GetAllocatedMemory() const { return Alloc.GetAllocated(); }
54+
ui64 GetAllocatedMemory() const { return Alloc->GetAllocated(); }
5555

5656
const TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> GetTasksStats() const { return Stats; }
5757
private:
5858
TMap<ui64, TIntrusivePtr<NYql::NDq::IDqTaskRunner>> TaskRunners;
5959
TMap<ui64, NYql::NDq::TDqTaskSettings> Tasks;
6060
TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> Stats;
6161
NYql::NDq::TLogFunc LogFunc;
62-
NMiniKQL::TScopedAlloc& Alloc;
62+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
6363
NMiniKQL::TKqpComputeContextBase* ComputeCtx;
6464
NMiniKQL::TKqpDatashardApplyContext* ApplyCtx;
6565

0 commit comments

Comments
 (0)