Skip to content

Commit aaa5daf

Browse files
authored
Merge 0ece53f into df15684
2 parents df15684 + 0ece53f commit aaa5daf

28 files changed

+433
-330
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2149,7 +2149,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
21492149

21502150
// Create resource manager
21512151
auto rm = NKqp::CreateKqpResourceManagerActor(Config.GetTableServiceConfig().GetResourceManager(), nullptr,
2152-
{}, kqpProxySharedResources);
2152+
{}, kqpProxySharedResources, NodeId);
21532153
setup->LocalServices.push_back(std::make_pair(
21542154
NKqp::MakeKqpRmServiceID(NodeId),
21552155
TActorSetupCmd(rm, TMailboxType::HTSwap, appData->UserPoolId)));

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 77 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
namespace NKikimr::NKqp::NComputeActor {
88

9+
910
struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
1011

1112
TMemoryQuotaManager(std::shared_ptr<NRm::IKqpResourceManager> resourceManager
@@ -26,7 +27,10 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
2627
}
2728

2829
~TMemoryQuotaManager() override {
29-
State->OnTaskTerminate(TxId, TaskId, Success);
30+
if (State) {
31+
State->OnTaskTerminate(TxId, TaskId, Success);
32+
}
33+
3034
ResourceManager->FreeResources(TxId, TaskId);
3135
}
3236

@@ -59,6 +63,10 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
5963
return TotalQueryAllocationsSize >= ReasonableSpillingTreshold;
6064
}
6165

66+
TString MemoryConsumptionDetails() const override {
67+
return ResourceManager->GetTxResourcesUsageDebugInfo(TxId);
68+
}
69+
6270
void TerminateHandler(bool success, const NYql::TIssues& issues) {
6371
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)
6472
("problem", "finish_compute_actor")
@@ -77,66 +85,97 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
7785
};
7886

7987
class TKqpCaFactory : public IKqpNodeComputeActorFactory {
80-
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
8188
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager_;
8289
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
8390
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
8491

92+
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
93+
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
94+
std::atomic<ui64> MinChannelBufferSize = 0;
95+
std::atomic<ui64> ReasonableSpillingTreshold = 0;
96+
8597
public:
8698
TKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
8799
std::shared_ptr<NRm::IKqpResourceManager> resourceManager,
88100
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
89101
const std::optional<TKqpFederatedQuerySetup> federatedQuerySetup)
90-
: Config(config)
91-
, ResourceManager_(resourceManager)
102+
: ResourceManager_(resourceManager)
92103
, AsyncIoFactory(asyncIoFactory)
93104
, FederatedQuerySetup(federatedQuerySetup)
94-
{}
105+
{
106+
ApplyConfig(config);
107+
}
95108

96-
TActorId CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* dqTask,
97-
const NYql::NDq::TComputeRuntimeSettings& settings,
98-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
99-
TComputeStagesWithScan& computesByStage, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
100-
NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks)
109+
void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config)
101110
{
111+
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
112+
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
113+
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
114+
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
115+
}
116+
117+
TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) {
102118
NYql::NDq::TComputeMemoryLimits memoryLimits;
103119
memoryLimits.ChannelBufferSize = 0;
104-
memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit();
105-
memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit();
120+
memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load();
121+
memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load();
122+
123+
auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks);
124+
NRm::TKqpResourcesRequest resourcesRequest;
125+
resourcesRequest.MemoryPool = args.MemoryPool;
126+
resourcesRequest.ExecutionUnits = 1;
127+
resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit;
106128

107-
auto estimation = EstimateTaskResources(*dqTask, Config, numberOfTasks);
129+
auto rmResult = ResourceManager_->AllocateResources(
130+
args.TxId, args.Task->GetId(), resourcesRequest);
131+
132+
if (!rmResult) {
133+
return NRm::TKqpRMAllocateResult{rmResult};
134+
}
108135

109136
{
110137
ui32 inputChannelsCount = 0;
111-
for (auto&& i : dqTask->GetInputs()) {
138+
for (auto&& i : args.Task->GetInputs()) {
112139
inputChannelsCount += i.ChannelsSize();
113140
}
114141

115-
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), Config.GetMinChannelBufferSize());
116-
memoryLimits.OutputChunkMaxSize = outputChunkMaxSize;
142+
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), MinChannelBufferSize.load());
143+
memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize;
117144
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
118145
("ch_size", estimation.ChannelBufferMemoryLimit)
119146
("ch_count", estimation.ChannelBuffersCount)
120147
("ch_limit", memoryLimits.ChannelBufferSize)
121-
("inputs", dqTask->InputsSize())
148+
("inputs", args.Task->InputsSize())
122149
("input_channels_count", inputChannelsCount);
123150
}
124151

125-
auto& taskOpts = dqTask->GetProgram().GetSettings();
152+
auto& taskOpts = args.Task->GetProgram().GetSettings();
126153
auto limit = taskOpts.GetHasMapJoin() || taskOpts.GetHasStateAggregation()
127154
? memoryLimits.MkqlHeavyProgramMemoryLimit
128155
: memoryLimits.MkqlLightProgramMemoryLimit;
129156

130157
memoryLimits.MemoryQuotaManager = std::make_shared<TMemoryQuotaManager>(
131158
ResourceManager_,
132-
memoryPool,
133-
std::move(state),
134-
txId,
135-
dqTask->GetId(),
159+
args.MemoryPool,
160+
std::move(args.State),
161+
args.TxId,
162+
args.Task->GetId(),
136163
limit,
137-
Config.GetReasonableSpillingTreshold());
164+
ReasonableSpillingTreshold.load());
165+
166+
auto runtimeSettings = args.RuntimeSettings;
167+
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
168+
runtimeSettings.UseSpilling = args.WithSpilling;
169+
runtimeSettings.StatsMode = args.StatsMode;
170+
171+
if (args.Deadline) {
172+
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
173+
}
174+
175+
if (args.RlPath) {
176+
runtimeSettings.RlPath = args.RlPath;
177+
}
138178

139-
auto runtimeSettings = settings;
140179
NYql::NDq::IMemoryQuotaManager::TWeakPtr memoryQuotaManager = memoryLimits.MemoryQuotaManager;
141180
runtimeSettings.TerminateHandler = [memoryQuotaManager]
142181
(bool success, const NYql::TIssues& issues) {
@@ -157,29 +196,32 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
157196
};
158197

159198
ETableKind tableKind = ETableKind::Unknown;
160-
if (dqTask->HasMetaId()) {
161-
YQL_ENSURE(computesByStage.GetMetaById(*dqTask, meta) || dqTask->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks");
199+
if (args.Task->HasMetaId()) {
200+
YQL_ENSURE(args.ComputesByStages);
201+
YQL_ENSURE(args.ComputesByStages->GetMetaById(*args.Task, meta) || args.Task->GetMeta().UnpackTo(&meta), "cannot take meta on MetaId exists in tasks");
162202
tableKind = tableKindExtract(meta);
163-
} else if (dqTask->GetMeta().UnpackTo(&meta)) {
203+
} else if (args.Task->GetMeta().UnpackTo(&meta)) {
164204
tableKind = tableKindExtract(meta);
165205
}
166206

167207
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
168-
auto& info = computesByStage.UpsertTaskWithScan(*dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
169-
IActor* computeActor = CreateKqpScanComputeActor(executerId, txId, dqTask,
208+
YQL_ENSURE(args.ComputesByStages);
209+
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
210+
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.Task,
170211
AsyncIoFactory, runtimeSettings, memoryLimits,
171-
std::move(traceId), std::move(arena));
212+
std::move(args.TraceId), std::move(args.Arena));
172213
TActorId result = TlsActivationContext->Register(computeActor);
173214
info.MutableActorIds().emplace_back(result);
174215
return result;
175216
} else {
176217
std::shared_ptr<TGUCSettings> GUCSettings;
177-
if (!serializedGUCSettings.empty()) {
178-
GUCSettings = std::make_shared<TGUCSettings>(serializedGUCSettings);
218+
if (!args.SerializedGUCSettings.empty()) {
219+
GUCSettings = std::make_shared<TGUCSettings>(args.SerializedGUCSettings);
179220
}
180-
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(executerId, txId, dqTask, AsyncIoFactory,
181-
runtimeSettings, memoryLimits, std::move(traceId), std::move(arena), FederatedQuerySetup, GUCSettings);
182-
return TlsActivationContext->Register(computeActor);
221+
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
222+
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings);
223+
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
224+
TlsActivationContext->AsActorContext().Register(computeActor);
183225
}
184226
}
185227
};

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,30 @@ struct IKqpNodeComputeActorFactory {
103103
virtual ~IKqpNodeComputeActorFactory() = default;
104104

105105
public:
106-
virtual NActors::TActorId CreateKqpComputeActor(const NActors::TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
107-
const NYql::NDq::TComputeRuntimeSettings& settings,
108-
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const TString& serializedGUCSettings,
109-
TComputeStagesWithScan& computeStages, ui64 outputChunkMaxSize, std::shared_ptr<IKqpNodeState> state,
110-
NKikimr::NKqp::NRm::EKqpMemoryPool memoryPool, ui32 numberOfTasks) = 0;
106+
struct TCreateArgs {
107+
const NActors::TActorId& ExecuterId;
108+
const ui64 TxId;
109+
NYql::NDqProto::TDqTask* Task;
110+
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;
111+
NWilson::TTraceId TraceId;
112+
TIntrusivePtr<NActors::TProtoArenaHolder> Arena;
113+
const TString& SerializedGUCSettings;
114+
const ui32 NumberOfTasks;
115+
const ui64 OutputChunkMaxSize;
116+
const NKikimr::NKqp::NRm::EKqpMemoryPool MemoryPool;
117+
const bool WithSpilling;
118+
const NYql::NDqProto::EDqStatsMode StatsMode;
119+
const TInstant& Deadline;
120+
const bool ShareMailbox;
121+
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;
122+
TComputeStagesWithScan* ComputesByStages = nullptr;
123+
std::shared_ptr<IKqpNodeState> State = nullptr;
124+
};
125+
126+
typedef std::variant<TActorId, NKikimr::NKqp::NRm::TKqpRMAllocateResult> TActorStartResult;
127+
virtual TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) = 0;
128+
129+
virtual void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) = 0;
111130
};
112131

113132
std::shared_ptr<IKqpNodeComputeActorFactory> MakeKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,7 @@ STFUNC(TKqpComputeActor::StateFunc) {
133133
BaseStateFuncBody(ev);
134134
}
135135
} catch (const TMemoryLimitExceededException& e) {
136-
InternalError(TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
137-
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
138-
<< ", host: " << HostName()
139-
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
136+
TBase::OnMemoryLimitExceptionHandler();
140137
} catch (const NMiniKQL::TKqpEnsureFail& e) {
141138
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
142139
} catch (const yexception& e) {

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
4646
BaseStateFuncBody(ev);
4747
}
4848
} catch (const TMemoryLimitExceededException& e) {
49-
const TString sInfo = TStringBuilder() << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
50-
<< ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory;
51-
CA_LOG_E("ERROR:" + sInfo);
52-
InternalError(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, sInfo);
49+
TBase::OnMemoryLimitExceptionHandler();
5350
} catch (const yexception& e) {
5451
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, NYql::TIssuesIds::DEFAULT_ERROR, e.what());
5552
}

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2489,7 +2489,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24892489
.FederatedQuerySetup = FederatedQuerySetup,
24902490
.OutputChunkMaxSize = Request.OutputChunkMaxSize,
24912491
.GUCSettings = GUCSettings,
2492-
.MayRunTasksLocally = mayRunTasksLocally
2492+
.MayRunTasksLocally = mayRunTasksLocally,
2493+
.ResourceManager_ = Request.ResourceManager_,
2494+
.CaFactory_ = Request.CaFactory_
24932495
});
24942496

24952497
auto err = Planner->PlanExecution();

0 commit comments

Comments
 (0)