Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ void Init(
lwmOptions.MkqlProgramHardMemoryLimit = protoConfig.GetResourceManager().GetMkqlTaskHardMemoryLimit();
lwmOptions.MkqlMinAllocSize = mkqlAllocSize;
lwmOptions.TaskRunnerActorFactory = NYql::NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
[=](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
[=](std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const NYql::NDq::TDqTaskSettings& task, NYql::NDqProto::EDqStatsMode statsMode, const NYql::NDq::TLogFunc&) {
return lwmOptions.Factory->Get(alloc, task, statsMode);
});
if (protoConfig.GetRateLimiter().GetDataPlaneEnabled()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void TKqpComputeActor::DoBootstrap() {
settings.ReadRanges.push_back(readRange);
}

auto taskRunner = MakeDqTaskRunner(TBase::GetAllocator(), execCtx, settings, logger);
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
SetTaskRunner(taskRunner);

auto wakeup = [this]{ ContinueExecute(); };
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void TKqpScanComputeActor::DoBootstrap() {
};
}

auto taskRunner = MakeDqTaskRunner(GetAllocator(), execCtx, settings, logger);
auto taskRunner = MakeDqTaskRunner(GetAllocatorPtr(), execCtx, settings, logger);
TBase::SetTaskRunner(taskRunner);

auto wakeup = [this] { ContinueExecute(); };
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch

TEvKqpExecuter::TEvTxResponse::~TEvTxResponse() {
if (!TxResults.empty() && Y_LIKELY(AllocState)) {
with_lock(AllocState->Alloc) {
with_lock(*AllocState->Alloc) {
TxResults.crop(0);
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
TString BuildMemoryLimitExceptionMessage() const {
if (Request.TxAlloc) {
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
<< ", current limit is " << Request.TxAlloc->Alloc.GetLimit() << " bytes.";
<< ", current limit is " << Request.TxAlloc->Alloc->GetLimit() << " bytes.";
}
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class TKqpLiteralExecuter {
UpdateCounters();
}

void RunTask(NMiniKQL::TScopedAlloc& alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
void RunTask(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

Expand Down Expand Up @@ -212,7 +212,7 @@ class TKqpLiteralExecuter {
auto status = taskRunner->Run();
YQL_ENSURE(status == ERunStatus::Finished);

with_lock (alloc) { // allocator is used only by outputChannel->PopAll()
with_lock (*alloc) { // allocator is used only by outputChannel->PopAll()
for (auto& taskOutput : task.Outputs) {
for (ui64 outputChannelId : taskOutput.Channels) {
auto outputChannel = taskRunner->GetOutputChannel(outputChannelId);
Expand Down Expand Up @@ -277,7 +277,7 @@ class TKqpLiteralExecuter {

private:
void CleanupCtx() {
with_lock(Request.TxAlloc->Alloc) {
with_lock(*Request.TxAlloc->Alloc) {
TaskRunners.erase(TaskRunners.begin(), TaskRunners.end());
Request.Transactions.erase(Request.Transactions.begin(), Request.Transactions.end());
ComputeCtx.reset();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
return peepHoleStatus;
}

auto guard = Guard(SessionCtx->Query().QueryData->GetAllocState()->Alloc);
auto guard = Guard(*SessionCtx->Query().QueryData->GetAllocState()->Alloc);

auto input = Build<TDqPhyStage>(ctx, pos)
.Inputs()
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kqp/query_data/kqp_query_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,19 @@ void TKqpExecuterTxResult::FillYdb(Ydb::ResultSet* ydbResult, TMaybe<ui64> rowsL

TTxAllocatorState::TTxAllocatorState(const IFunctionRegistry* functionRegistry,
TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider)
: Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators())
, TypeEnv(Alloc)
: Alloc(std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators()))
, TypeEnv(*Alloc)
, MemInfo("TQueryData")
, HolderFactory(Alloc.Ref(), MemInfo, functionRegistry)
, HolderFactory(Alloc->Ref(), MemInfo, functionRegistry)
{
Alloc.Release();
Alloc->Release();
TimeProvider = timeProvider;
RandomProvider = randomProvider;
}

TTxAllocatorState::~TTxAllocatorState()
{
Alloc.Acquire();
Alloc->Acquire();
}

std::pair<NKikimr::NMiniKQL::TType*, NUdf::TUnboxedValue> TTxAllocatorState::GetInternalBindingValue(
Expand Down Expand Up @@ -366,7 +366,7 @@ const NKikimrMiniKQL::TParams* TQueryData::GetParameterMiniKqlValue(const TStrin

auto it = Params.find(name);
if (it == Params.end()) {
with_lock(AllocState->Alloc) {
with_lock(*AllocState->Alloc) {
const auto& [type, uv] = GetParameterUnboxedValue(name);
NKikimrMiniKQL::TParams param;
ExportTypeToProto(type, *param.MutableType());
Expand All @@ -388,7 +388,7 @@ const Ydb::TypedValue* TQueryData::GetParameterTypedValue(const TString& name) {

auto it = ParamsProtobuf.find(name);
if (it == ParamsProtobuf.end()) {
with_lock(AllocState->Alloc) {
with_lock(*AllocState->Alloc) {
const auto& [type, uv] = GetParameterUnboxedValue(name);

auto& tv = ParamsProtobuf[name];
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/query_data/kqp_query_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ struct TTimeAndRandomProvider {

class TTxAllocatorState: public TTimeAndRandomProvider {
public:
NKikimr::NMiniKQL::TScopedAlloc Alloc;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NKikimr::NMiniKQL::TTypeEnvironment TypeEnv;
NKikimr::NMiniKQL::TMemoryUsageInfo MemInfo;
NKikimr::NMiniKQL::THolderFactory HolderFactory;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/runtime/kqp_tasks_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NDqProto::TTaskOutput& outp


TKqpTasksRunner::TKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
: LogFunc(logFunc)
, Alloc(alloc)
Expand Down Expand Up @@ -230,13 +230,13 @@ const NYql::NDq::TDqTaskSettings& TKqpTasksRunner::GetTask(ui64 taskId) const {

TGuard<NMiniKQL::TScopedAlloc> TKqpTasksRunner::BindAllocator(TMaybe<ui64> memoryLimit) {
if (memoryLimit) {
Alloc.SetLimit(*memoryLimit);
Alloc->SetLimit(*memoryLimit);
}
return TGuard(Alloc);
return TGuard(*Alloc);
}

TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TDqTaskRunnerContext& execCtx, const TDqTaskRunnerSettings& settings, const TLogFunc& logFunc)
{
return new TKqpTasksRunner(std::move(tasks), alloc, execCtx, settings, logFunc);
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/runtime/kqp_tasks_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ NYql::NDq::IDqOutputConsumer::TPtr KqpBuildOutputConsumer(const NYql::NDqProto::
class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCopyable {
public:
TKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);

Expand Down Expand Up @@ -51,15 +51,15 @@ class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCop
// otherwise use particular memory limit
TGuard<NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = Nothing());

ui64 GetAllocatedMemory() const { return Alloc.GetAllocated(); }
ui64 GetAllocatedMemory() const { return Alloc->GetAllocated(); }

const TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> GetTasksStats() const { return Stats; }
private:
TMap<ui64, TIntrusivePtr<NYql::NDq::IDqTaskRunner>> TaskRunners;
TMap<ui64, NYql::NDq::TDqTaskSettings> Tasks;
TMap<ui64, const NYql::NDq::TDqTaskRunnerStats*> Stats;
NYql::NDq::TLogFunc LogFunc;
NMiniKQL::TScopedAlloc& Alloc;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NMiniKQL::TKqpComputeContextBase* ComputeCtx;
NMiniKQL::TKqpDatashardApplyContext* ApplyCtx;

Expand All @@ -73,7 +73,7 @@ class TKqpTasksRunner : public TSimpleRefCount<TKqpTasksRunner>, private TNonCop


TIntrusivePtr<TKqpTasksRunner> CreateKqpTasksRunner(google::protobuf::RepeatedPtrField<NYql::NDqProto::TDqTask>&& tasks,
NKikimr::NMiniKQL::TScopedAlloc& alloc,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const NYql::NDq::TDqTaskRunnerContext& execCtx, const NYql::NDq::TDqTaskRunnerSettings& settings,
const NYql::NDq::TLogFunc& logFunc);

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,11 +692,11 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
ui64 mkqlMaxLimit = phaseLimitsProto.GetComputeNodeMemoryLimitBytes();
mkqlMaxLimit = mkqlMaxLimit ? mkqlMaxLimit : ui64(Settings.MkqlMaxMemoryLimit);

alloc->Alloc.SetLimit(mkqlInitialLimit);
alloc->Alloc.Ref().SetIncreaseMemoryLimitCallback([this, &alloc, mkqlMaxLimit](ui64 currentLimit, ui64 required) {
alloc->Alloc->SetLimit(mkqlInitialLimit);
alloc->Alloc->Ref().SetIncreaseMemoryLimitCallback([this, &alloc, mkqlMaxLimit](ui64 currentLimit, ui64 required) {
if (required < mkqlMaxLimit) {
LOG_D("Increase memory limit from " << currentLimit << " to " << required);
alloc->Alloc.SetLimit(required);
alloc->Alloc->SetLimit(required);
}
});

Expand Down Expand Up @@ -2325,7 +2325,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
TString BuildMemoryLimitExceptionMessage() const {
if (QueryState && QueryState->TxCtx) {
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
<< ", current limit is " << QueryState->TxCtx->TxAlloc->Alloc.GetLimit() << " bytes.";
<< ", current limit is " << QueryState->TxCtx->TxAlloc->Alloc->GetLimit() << " bytes.";
} else {
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ TEngineBay::TEngineBay(TDataShard* self, TTransactionContext& txc, const TActorC
ComputeCtx = MakeHolder<TKqpDatashardComputeContext>(self, GetUserDb(), EngineHost->GetSettings().DisableByKeyFilter);
ComputeCtx->Database = &txc.DB;

KqpAlloc = MakeHolder<TScopedAlloc>(__LOCATION__, TAlignedPagePoolCounters(), AppData(ctx)->FunctionRegistry->SupportsSizedAllocators());
KqpAlloc = std::make_shared<TScopedAlloc>(__LOCATION__, TAlignedPagePoolCounters(), AppData(ctx)->FunctionRegistry->SupportsSizedAllocators());
KqpTypeEnv = MakeHolder<TTypeEnvironment>(*KqpAlloc);
KqpAlloc->Release();

Expand Down Expand Up @@ -725,7 +725,7 @@ NKqp::TKqpTasksRunner& TEngineBay::GetKqpTasksRunner(NKikimrTxDataShard::TKqpTra
settings.TerminateOnError = false;
Y_ABORT_UNLESS(KqpAlloc);
KqpAlloc->SetLimit(10_MB);
KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), *KqpAlloc.Get(), KqpExecCtx, settings, KqpLogFunc);
KqpTasksRunner = NKqp::CreateKqpTasksRunner(std::move(*tx.MutableTasks()), KqpAlloc, KqpExecCtx, settings, KqpLogFunc);
}

return *KqpTasksRunner;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard__engine_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TEngineBay : TNonCopyable {
auto guard = TGuard(*KqpAlloc);
KqpTypeEnv.Reset();
}
KqpAlloc.Reset();
KqpAlloc.reset();
}
KqpExecCtx = {};

Expand Down Expand Up @@ -128,7 +128,7 @@ class TEngineBay : TNonCopyable {
NYql::NDq::TLogFunc KqpLogFunc;
THolder<NUdf::IApplyContext> KqpApplyCtx;
THolder<NMiniKQL::TKqpDatashardComputeContext> ComputeCtx;
THolder<NMiniKQL::TScopedAlloc> KqpAlloc;
std::shared_ptr<NMiniKQL::TScopedAlloc> KqpAlloc;
THolder<NMiniKQL::TTypeEnvironment> KqpTypeEnv;
NYql::NDq::TDqTaskRunnerContext KqpExecCtx;
TIntrusivePtr<NKqp::TKqpTasksRunner> KqpTasksRunner;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TDqComputeActor : public TDqSyncComputeActorBase<TDqComputeActor> {
};
}

auto taskRunner = TaskRunnerFactory(GetAllocator(), Task, RuntimeSettings.StatsMode, logger);
auto taskRunner = TaskRunnerFactory(GetAllocatorPtr(), Task, RuntimeSettings.StatsMode, logger);
SetTaskRunner(taskRunner);
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup));
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ struct TComputeMemoryLimits {
};

using TTaskRunnerFactory = std::function<
TIntrusivePtr<IDqTaskRunner>(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
TIntrusivePtr<IDqTaskRunner>(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
>;

void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class TLocalTaskRunnerActor
void OnDqTask(TEvTaskRunnerCreate::TPtr& ev) {
ParentId = ev->Sender;
auto settings = NDq::TDqTaskSettings(&ev->Get()->Task);
TaskRunner = Factory(*Alloc.get(), settings, ev->Get()->StatsMode, [this](const TString& message) {
TaskRunner = Factory(Alloc, settings, ev->Get()->StatsMode, [this](const TString& message) {
LOG_D(message);
});

Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1011,10 +1011,10 @@ class TDqTaskRunner : public IDqTaskRunner {
}
};

TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings,
TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings,
const TLogFunc& logFunc)
{
return new TDqTaskRunner(alloc, ctx, settings, logFunc);
return new TDqTaskRunner(*alloc, ctx, settings, logFunc);
}

} // namespace NYql::NDq
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/runtime/dq_tasks_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ class IDqTaskRunner : public TSimpleRefCount<IDqTaskRunner>, private TNonCopyabl
};

TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(
NKikimr::NMiniKQL::TScopedAlloc& alloc,
const TDqTaskRunnerContext& ctx,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
const TDqTaskRunnerContext& ctx,
const TDqTaskRunnerSettings& settings,
const TLogFunc& logFunc
);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/actors/compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ IActor* CreateComputeActor(
}
}

auto taskRunnerFactory = [factory = options.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& logger) {
auto taskRunnerFactory = [factory = options.Factory](std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& logger) {
Y_UNUSED(logger);
return factory->Get(alloc, task, statsMode, {});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class TLocalServiceHolder {
lwmOptions.FunctionRegistry = functionRegistry;
lwmOptions.TaskRunnerInvokerFactory = new NDqs::TTaskRunnerInvokerFactory();
lwmOptions.TaskRunnerActorFactory = NDq::NTaskRunnerActor::CreateLocalTaskRunnerActorFactory(
[factory=lwmOptions.Factory](NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
[factory=lwmOptions.Factory](std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const NDq::TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const NDq::TLogFunc& )
{
return factory->Get(alloc, task, statsMode);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ class TLocalExecutor: public TCounters
? CreateDeterministicRandomProvider(1)
: State->RandomProvider;

TScopedAlloc alloc(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
State->FunctionRegistry->SupportsSizedAllocators(),
false);
NDq::TDqTaskRunnerContext executionContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,14 @@ class TTaskCommandExecutor {

Y_ABORT_UNLESS(!Alloc);
Y_ABORT_UNLESS(FunctionRegistry);
Alloc = std::make_unique<NKikimr::NMiniKQL::TScopedAlloc>(
Alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(
__LOCATION__,
NKikimr::TAlignedPagePoolCounters(),
FunctionRegistry->SupportsSizedAllocators(),
false
);

Runner = MakeDqTaskRunner(*Alloc.get(), Ctx, settings, nullptr);
Runner = MakeDqTaskRunner(Alloc, Ctx, settings, nullptr);
});

auto guard = Runner->BindAllocator(DqConfiguration->MemoryLimit.Get().GetOrElse(0));
Expand Down Expand Up @@ -770,7 +770,7 @@ class TTaskCommandExecutor {
result.Save(&output);
}
private:
std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory;
TTaskTransformFactory TaskTransformFactory;
NKikimr::NMiniKQL::IStatsRegistry* JobStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,11 @@ class TLocalFactory: public IProxyFactory {
ExecutionContext.PatternCache = patternCache;
}

ITaskRunner::TPtr GetOld(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, const TString& traceId) override {
ITaskRunner::TPtr GetOld(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TDqTaskSettings& task, const TString& traceId) override {
return new TLocalTaskRunner(task, Get(alloc, task, NDqProto::DQ_STATS_MODE_BASIC, traceId));
}

TIntrusivePtr<NDq::IDqTaskRunner> Get(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId) override {
TIntrusivePtr<NDq::IDqTaskRunner> Get(std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TString& traceId) override {
Y_UNUSED(traceId);
NDq::TDqTaskRunnerSettings settings;
settings.TerminateOnError = TerminateOnError;
Expand Down
Loading