Skip to content

Commit cbc23c8

Browse files
authored
Merge b6be2cb into e1268db
2 parents e1268db + b6be2cb commit cbc23c8

26 files changed

+53
-56
lines changed

ydb/core/fq/libs/init/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ void Init(
262262
lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
263263
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
264264
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
265-
[=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
265+
[=](std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
266266
return lwmOptions.Factory->Get(alloc, task, statsMode);
267267
});
268268
if (protoConfig.GetRateLimiter().GetDataPlaneEnabled()) {

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ void TKqpComputeActor::DoBootstrap() {
6969
settings.ReadRanges.push_back(readRange);
7070
}
7171

72-
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocator(), execCtx, settings, logger);
72+
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
7373
SetTaskRunner(taskRunner);
7474

7575
auto wakeup = [this]{ ContinueExecute(); };

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ void TKqpScanComputeActor::DoBootstrap() {
216216
};
217217
}
218218

219-
auto taskRunner = MakeDqTaskRunner(GetAllocator(), execCtx, settings, logger);
219+
auto taskRunner = MakeDqTaskRunner(GetAllocatorPtr(), execCtx, settings, logger);
220220
TBase::SetTaskRunner(taskRunner);
221221

222222
auto wakeup = [this] { ContinueExecute(); };

ydb/core/kqp/executer_actor/kqp_executer_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch
4949

5050
TEvKqpExecuter::TEvTxResponse::~TEvTxResponse() {
5151
if (!TxResults.empty() && Y_LIKELY(AllocState)) {
52-
with_lock(AllocState->Alloc) {
52+
with_lock(*AllocState->Alloc) {
5353
TxResults.crop(0);
5454
}
5555
}

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
166166
TString BuildMemoryLimitExceptionMessage() const {
167167
if (Request.TxAlloc) {
168168
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
169-
<< ", current limit is " << Request.TxAlloc->Alloc.GetLimit() << " bytes.";
169+
<< ", current limit is " << Request.TxAlloc->Alloc->GetLimit() << " bytes.";
170170
}
171171
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
172172
}

ydb/core/kqp/executer_actor/kqp_literal_executer.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class TKqpLiteralExecuter {
166166
UpdateCounters();
167167
}
168168

169-
void RunTask(NMiniKQL::TScopedAlloc& alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
169+
void RunTask(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
170170
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
171171
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
172172

@@ -211,7 +211,7 @@ class TKqpLiteralExecuter {
211211
auto status = taskRunner->Run();
212212
YQL_ENSURE(status == ERunStatus::Finished);
213213

214-
with_lock (alloc) { // allocator is used only by outputChannel->PopAll()
214+
with_lock (*alloc) { // allocator is used only by outputChannel->PopAll()
215215
for (auto& taskOutput : task.Outputs) {
216216
for (ui64 outputChannelId : taskOutput.Channels) {
217217
auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);
@@ -276,7 +276,7 @@ class TKqpLiteralExecuter {
276276

277277
private:
278278
void CleanupCtx() {
279-
with_lock(Request.TxAlloc->Alloc) {
279+
with_lock(*Request.TxAlloc->Alloc) {
280280
TaskRunners.erase(TaskRunners.begin(), TaskRunners.end());
281281
Request.Transactions.erase(Request.Transactions.begin(), Request.Transactions.end());
282282
ComputeCtx.reset();

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
577577
return peepHoleStatus;
578578
}
579579

580-
auto guard = Guard(SessionCtx->Query().QueryData->GetAllocState()->Alloc);
580+
auto guard = Guard(*SessionCtx->Query().QueryData->GetAllocState()->Alloc);
581581

582582
auto input = Build<TDqPhyStage>(ctx, pos)
583583
.Inputs()

ydb/core/kqp/query_data/kqp_query_data.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,17 @@ void TKqpExecuterTxResult::FillYdb(Ydb::ResultSet* ydbResult, TMaybe<ui64> rowsL
117117

118118
TTxAllocatorState::TTxAllocatorState(const IFunctionRegistry* functionRegistry,
119119
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider)
120-
: Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators())
121-
, TypeEnv(Alloc)
120+
: Alloc(std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators(), false))
121+
, TypeEnv(*Alloc)
122122
, MemInfo("TQueryData")
123-
, HolderFactory(Alloc.Ref(), MemInfo, functionRegistry)
123+
, HolderFactory(Alloc->Ref(), MemInfo, functionRegistry)
124124
{
125-
Alloc.Release();
126125
TimeProvider = timeProvider;
127126
RandomProvider = randomProvider;
128127
}
129128

130129
TTxAllocatorState::~TTxAllocatorState()
131130
{
132-
Alloc.Acquire();
133131
}
134132

135133
std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> TTxAllocatorState::GetInternalBindingValue(
@@ -366,7 +364,7 @@ const NKikimrMiniKQL::TParams* TQueryData::GetParameterMiniKqlValue(const TStrin
366364

367365
auto it = Params.find(name);
368366
if (it == Params.end()) {
369-
with_lock(AllocState->Alloc) {
367+
with_lock(*AllocState->Alloc) {
370368
const auto& [type, uv] = GetParameterUnboxedValue(name);
371369
NKikimrMiniKQL::TParams param;
372370
ExportTypeToProto(type, *param.MutableType());
@@ -388,7 +386,7 @@ const Ydb::TypedValue* TQueryData::GetParameterTypedValue(const TString& name) {
388386

389387
auto it = ParamsProtobuf.find(name);
390388
if (it == ParamsProtobuf.end()) {
391-
with_lock(AllocState->Alloc) {
389+
with_lock(*AllocState->Alloc) {
392390
const auto& [type, uv] = GetParameterUnboxedValue(name);
393391

394392
auto& tv = ParamsProtobuf[name];

ydb/core/kqp/query_data/kqp_query_data.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ struct TTimeAndRandomProvider {
166166

167167
class TTxAllocatorState: public TTimeAndRandomProvider {
168168
public:
169-
NKikimr::NMiniKQL::TScopedAlloc Alloc;
169+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
170170
NKikimr::NMiniKQL::TTypeEnvironment TypeEnv;
171171
NKikimr::NMiniKQL::TMemoryUsageInfo MemInfo;
172172
NKikimr::NMiniKQL::THolderFactory HolderFactory;

ydb/core/kqp/runtime/kqp_tasks_runner.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp
7070

7171

7272
TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
73-
NKikimr::NMiniKQL::TScopedAlloc& alloc,
73+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
7474
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
7575
: LogFunc(logFunc)
7676
, Alloc(alloc)
@@ -230,13 +230,13 @@ const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const {
230230

231231
TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) {
232232
if (memoryLimit) {
233-
Alloc.SetLimit(*memoryLimit);
233+
Alloc->SetLimit(*memoryLimit);
234234
}
235-
return TGuard(Alloc);
235+
return TGuard(*Alloc);
236236
}
237237

238238
TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
239-
NKikimr::NMiniKQL::TScopedAlloc& alloc,
239+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
240240
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
241241
{
242242
return new TKqpTasksRunner(std::move(tasks), alloc, execCtx, settings, logFunc);

0 commit comments

Comments
 (0)