Skip to content

Commit 3db0f0e

Browse files
committed
resource manager to control execution units
1 parent 805196e commit 3db0f0e

File tree

5 files changed

+94
-79
lines changed

5 files changed

+94
-79
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ NKqpNode::TState& GetStateBucketByTx(std::shared_ptr<TBucketArray> buckets, ui64
6262
void FinishKqpTask(ui64 txId, ui64 taskId, bool success, NKqpNode::TState& bucket, std::shared_ptr<NRm::IKqpResourceManager> ResourceManager) {
6363
auto ctx = bucket.RemoveTask(txId, taskId, success);
6464
if (ctx) {
65+
ResourceManager->FreeExecutionUnits(1);
6566
if (ctx->ComputeActorsNumber == 0) {
6667
ResourceManager->FreeResources(txId);
6768
} else {
@@ -310,6 +311,16 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
310311
memoryPool = NRm::EKqpMemoryPool::Unspecified;
311312
}
312313

314+
size_t executionUnits = msg.GetTasks().size();
315+
if (!ResourceManager()->AllocateExecutionUnits(executionUnits)) {
316+
Counters->RmNotEnoughComputeActors->Inc();
317+
TStringBuilder error;
318+
error << "TxId: " << txId << ", NodeId: " << SelfId().NodeId() << ", not enough compute actors, requested " << msg.GetTasks().size();
319+
LOG_N(error);
320+
ReplyError(txId, request.Executer, msg, NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_EXECUTION_UNITS, error);
321+
return;
322+
}
323+
313324
ui32 requestChannels = 0;
314325
for (auto& dqTask : *msg.MutableTasks()) {
315326
auto estimation = EstimateTaskResources(dqTask, Config, msg.GetTasks().size());
@@ -348,7 +359,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
348359
allocatedTasks.reserve(msg.GetTasks().size());
349360
for (auto& task : request.InFlyTasks) {
350361
NRm::TKqpResourcesRequest resourcesRequest;
351-
resourcesRequest.ExecutionUnits = 1;
352362
resourcesRequest.MemoryPool = memoryPool;
353363

354364
// !!!!!!!!!!!!!!!!!!!!!
@@ -359,14 +369,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
359369
if (!ResourceManager()->AllocateResources(txId, task.first, resourcesRequest, &resourcesResponse)) {
360370
NKikimrKqp::TEvStartKqpTasksResponse::ENotStartedTaskReason failReason = NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR;
361371
TStringBuilder error;
362-
363-
if (resourcesResponse.ExecutionUnits()) {
364-
error << "TxId: " << txId << ", NodeId: " << SelfId().NodeId() << ", not enough compute actors, requested " << msg.GetTasks().size();
365-
LOG_N(error);
366-
367-
failReason = NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_EXECUTION_UNITS;
368-
}
369-
370372
if (resourcesResponse.ScanQueryMemory()) {
371373
error << "TxId: " << txId << ", NodeId: " << SelfId().NodeId() << ", not enough memory, requested " << task.second.Memory;
372374
LOG_N(error);
@@ -385,6 +387,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
385387
ResourceManager()->FreeResources(txId, taskId);
386388
}
387389

390+
ResourceManager()->FreeExecutionUnits(executionUnits);
391+
388392
ReplyError(txId, request.Executer, msg, failReason, error);
389393
return;
390394
}
@@ -550,6 +554,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
550554
ResourceManager()->FreeResources(txId);
551555

552556
for (const auto& tasksRequest: tasksToAbort) {
557+
ResourceManager()->FreeExecutionUnits(tasksRequest.InFlyTasks.size());
553558
for (const auto& [taskId, task] : tasksRequest.InFlyTasks) {
554559
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::UNSPECIFIED,
555560
reason);

ydb/core/kqp/node_service/kqp_node_ut.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ void KqpNode::NotEnoughComputeActors() {
565565
}
566566
}
567567

568-
AssertResourceBrokerSensors(0, 0, 0, 4, 0);
568+
AssertResourceBrokerSensors(0, 0, 0, 0, 0);
569569

570570
{
571571
NKikimr::TActorSystemStub stub;
@@ -598,6 +598,7 @@ void KqpNode::ResourceBrokerNotEnoughResources() {
598598
UNIT_ASSERT_VALUES_EQUAL(2, record.GetTxId());
599599
UNIT_ASSERT_VALUES_EQUAL(2, record.GetNotStartedTasks().size());
600600
for (auto& task : record.GetNotStartedTasks()) {
601+
Cerr << TEvStartKqpTasksResponse_ENotStartedTaskReason_Name(task.GetReason()) << Endl;
601602
UNIT_ASSERT_EQUAL(NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_MEMORY, task.GetReason());
602603
}
603604
}
@@ -669,6 +670,13 @@ void KqpNode::ExecuterLost() {
669670
UNIT_ASSERT_VALUES_EQUAL("executer lost", abortEvent->Get()->Record.GetLegacyMessage());
670671
}
671672

673+
size_t iterations = 30;
674+
while (KqpCounters->RmComputeActors->Val() != 0 && iterations > 0) {
675+
Sleep(TDuration::MilliSeconds(300));
676+
iterations--;
677+
Cerr << "waiting compute actors to complete" << Endl;
678+
}
679+
672680
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 0);
673681
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmMemory->Val(), 0);
674682
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmNotEnoughMemory->Val(), 0);

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 34 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ class TKqpResourceManager : public IKqpResourceManager {
141141
: Config(config)
142142
, Counters(counters)
143143
, ExecutionUnitsResource(Config.GetComputeActorsCount())
144+
, ExecutionUnitsLimit(Config.GetComputeActorsCount())
144145
, ScanQueryMemoryResource(Config.GetQueryMemoryLimit())
145146
, PublishResourcesByExchanger(Config.GetEnablePublishResourcesByExchanger()) {
146147

@@ -174,20 +175,39 @@ class TKqpResourceManager : public IKqpResourceManager {
174175
}
175176
}
176177

178+
bool AllocateExecutionUnits(ui32 cnt) override {
179+
i32 prev = ExecutionUnitsResource.fetch_sub(cnt);
180+
if (prev < (i32)cnt) {
181+
ExecutionUnitsResource.fetch_add(cnt);
182+
return false;
183+
} else {
184+
Counters->RmComputeActors->Add(cnt);
185+
return true;
186+
}
187+
}
188+
189+
void FreeExecutionUnits(ui32 cnt) override {
190+
if (cnt == 0) {
191+
return;
192+
}
193+
194+
ExecutionUnitsResource.fetch_add(cnt);
195+
Counters->RmComputeActors->Sub(cnt);
196+
}
197+
177198
bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
178199
TKqpNotEnoughResources* details = nullptr) override {
179200
if (resources.MemoryPool == EKqpMemoryPool::DataQuery) {
180201
NotifyExternalResourcesAllocated(txId, taskId, resources);
181202
return true;
182203
}
183204
Y_ABORT_UNLESS(resources.MemoryPool == EKqpMemoryPool::ScanQuery);
184-
if (Y_UNLIKELY(resources.Memory == 0 && resources.ExecutionUnits == 0)) {
205+
if (Y_UNLIKELY(resources.Memory == 0)) {
185206
return true;
186207
}
187208

188209
auto now = ActorSystem->Timestamp();
189210
bool hasScanQueryMemory = true;
190-
bool hasExecutionUnits = true;
191211
ui64 queryMemoryLimit = 0;
192212

193213
with_lock (Lock) {
@@ -200,11 +220,8 @@ class TKqpResourceManager : public IKqpResourceManager {
200220
}
201221

202222
hasScanQueryMemory = ScanQueryMemoryResource.Has(resources.Memory);
203-
hasExecutionUnits = ExecutionUnitsResource.Has(resources.ExecutionUnits);
204-
205-
if (hasScanQueryMemory && hasExecutionUnits) {
223+
if (hasScanQueryMemory) {
206224
ScanQueryMemoryResource.Acquire(resources.Memory);
207-
ExecutionUnitsResource.Acquire(resources.ExecutionUnits);
208225
queryMemoryLimit = Config.GetQueryMemoryLimit();
209226
}
210227
} // with_lock (Lock)
@@ -218,15 +235,6 @@ class TKqpResourceManager : public IKqpResourceManager {
218235
return false;
219236
}
220237

221-
if (!hasExecutionUnits) {
222-
Counters->RmNotEnoughComputeActors->Inc();
223-
LOG_AS_N("TxId: " << txId << ", taskId: " << taskId << ". Not enough ExecutionUnits, requested: " << resources.ExecutionUnits);
224-
if (details) {
225-
details->SetExecutionUnits();
226-
}
227-
return false;
228-
}
229-
230238
ui64 rbTaskId = LastResourceBrokerTaskId.fetch_add(1) + 1;
231239
TString rbTaskName = TStringBuilder() << "kqp-" << txId << '-' << taskId << '-' << rbTaskId;
232240
bool extraAlloc = false;
@@ -239,7 +247,6 @@ class TKqpResourceManager : public IKqpResourceManager {
239247

240248
with_lock (Lock) {
241249
ScanQueryMemoryResource.Release(resources.Memory);
242-
ExecutionUnitsResource.Release(resources.ExecutionUnits);
243250
} // with_lock (Lock)
244251

245252
Counters->RmNotEnoughMemory->Inc();
@@ -261,7 +268,6 @@ class TKqpResourceManager : public IKqpResourceManager {
261268

262269
with_lock (Lock) {
263270
ScanQueryMemoryResource.Release(resources.Memory);
264-
ExecutionUnitsResource.Release(resources.ExecutionUnits);
265271
} // with_lock (Lock)
266272

267273
Counters->RmNotEnoughMemory->Inc();
@@ -276,14 +282,12 @@ class TKqpResourceManager : public IKqpResourceManager {
276282
auto& txState = txBucket.Txs[txId];
277283

278284
txState.TxScanQueryMemory += resources.Memory;
279-
txState.TxExecutionUnits += resources.ExecutionUnits;
280285
if (!txState.CreatedAt) {
281286
txState.CreatedAt = now;
282287
}
283288

284289
auto& taskState = txState.Tasks[taskId];
285290
taskState.ScanQueryMemory += resources.Memory;
286-
taskState.ExecutionUnits += resources.ExecutionUnits;
287291
if (!taskState.CreatedAt) {
288292
taskState.CreatedAt = now;
289293
}
@@ -299,7 +303,6 @@ class TKqpResourceManager : public IKqpResourceManager {
299303

300304
LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". Allocated " << resources.ToString());
301305

302-
Counters->RmComputeActors->Add(resources.ExecutionUnits);
303306
Counters->RmMemory->Add(resources.Memory);
304307
if (extraAlloc) {
305308
Counters->RmExtraMemAllocs->Inc();
@@ -340,20 +343,16 @@ class TKqpResourceManager : public IKqpResourceManager {
340343
}
341344

342345
taskIt->second.ScanQueryMemory -= resources.Memory;
343-
taskIt->second.ExecutionUnits -= resources.ExecutionUnits;
344346

345347
bool reduced = ResourceBroker->ReduceTaskResourcesInstant(
346348
taskIt->second.ResourceBrokerTaskId, {0, resources.Memory}, SelfId);
347349
Y_DEBUG_ABORT_UNLESS(reduced);
348350

349351
txIt->second.TxScanQueryMemory -= resources.Memory;
350-
txIt->second.TxExecutionUnits -= resources.ExecutionUnits;
351352

352353
ScanQueryMemoryResource.Release(resources.Memory);
353-
ExecutionUnitsResource.Release(resources.ExecutionUnits);
354354
}
355355

356-
Counters->RmComputeActors->Sub(resources.ExecutionUnits);
357356
Counters->RmMemory->Sub(resources.Memory);
358357

359358
Y_DEBUG_ABORT_UNLESS(Counters->RmComputeActors->Val() >= 0);
@@ -364,7 +363,6 @@ class TKqpResourceManager : public IKqpResourceManager {
364363

365364
void FreeResources(ui64 txId, ui64 taskId) override {
366365
ui64 releaseScanQueryMemory = 0;
367-
ui32 releaseExecutionUnits = 0;
368366
ui32 remainsTasks = 0;
369367

370368
auto& txBucket = TxBucket(txId);
@@ -389,7 +387,6 @@ class TKqpResourceManager : public IKqpResourceManager {
389387
}
390388

391389
releaseScanQueryMemory = taskIt->second.ScanQueryMemory;
392-
releaseExecutionUnits = taskIt->second.ExecutionUnits;
393390

394391
bool finished = ResourceBroker->FinishTaskInstant(
395392
TEvResourceBroker::TEvFinishTask(taskIt->second.ResourceBrokerTaskId), SelfId);
@@ -402,20 +399,17 @@ class TKqpResourceManager : public IKqpResourceManager {
402399
} else {
403400
txIt->second.Tasks.erase(taskIt);
404401
txIt->second.TxScanQueryMemory -= releaseScanQueryMemory;
405-
txIt->second.TxExecutionUnits -= releaseExecutionUnits;
406402
}
407403
} // with_lock (txBucket.Lock)
408404

409405
with_lock (Lock) {
410406
ScanQueryMemoryResource.Release(releaseScanQueryMemory);
411-
ExecutionUnitsResource.Release(releaseExecutionUnits);
412407
} // with_lock (Lock)
413408

414409
LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". Released resources, "
415-
<< "ScanQueryMemory: " << releaseScanQueryMemory << ", ExecutionUnits: " << releaseExecutionUnits << ". "
410+
<< "ScanQueryMemory: " << releaseScanQueryMemory << ". "
416411
<< "Remains " << remainsTasks << " tasks in this tx.");
417412

418-
Counters->RmComputeActors->Sub(releaseExecutionUnits);
419413
Counters->RmMemory->Sub(releaseScanQueryMemory);
420414

421415
Y_DEBUG_ABORT_UNLESS(Counters->RmComputeActors->Val() >= 0);
@@ -426,7 +420,6 @@ class TKqpResourceManager : public IKqpResourceManager {
426420

427421
void FreeResources(ui64 txId) override {
428422
ui64 releaseScanQueryMemory = 0;
429-
ui32 releaseExecutionUnits = 0;
430423

431424
auto& txBucket = TxBucket(txId);
432425

@@ -450,21 +443,17 @@ class TKqpResourceManager : public IKqpResourceManager {
450443
}
451444

452445
releaseScanQueryMemory = txIt->second.TxScanQueryMemory;
453-
releaseExecutionUnits = txIt->second.TxExecutionUnits;
454446

455447
txBucket.Txs.erase(txIt);
456448
} // with_lock (txBucket.Lock)
457449

458450
with_lock (Lock) {
459451
ScanQueryMemoryResource.Release(releaseScanQueryMemory);
460-
ExecutionUnitsResource.Release(releaseExecutionUnits);
461452
} // with_lock (Lock)
462453

463454
LOG_AS_D("TxId: " << txId << ". Released resources, "
464-
<< "ScanQueryMemory: " << releaseScanQueryMemory << ", ExecutionUnits: " << releaseExecutionUnits << ". "
465-
<< "Tx completed.");
455+
<< "ScanQueryMemory: " << releaseScanQueryMemory << ", ExecutionUnits: " << "Tx completed.");
466456

467-
Counters->RmComputeActors->Sub(releaseExecutionUnits);
468457
Counters->RmMemory->Sub(releaseScanQueryMemory);
469458

470459
Y_DEBUG_ABORT_UNLESS(Counters->RmComputeActors->Val() >= 0);
@@ -635,7 +624,7 @@ class TKqpResourceManager : public IKqpResourceManager {
635624
result.Memory.fill(0);
636625

637626
with_lock (Lock) {
638-
result.ExecutionUnits = ExecutionUnitsResource.Available();
627+
result.ExecutionUnits = ExecutionUnitsResource.load();
639628
result.Memory[EKqpMemoryPool::ScanQuery] = ScanQueryMemoryResource.Available();
640629
}
641630

@@ -695,7 +684,8 @@ class TKqpResourceManager : public IKqpResourceManager {
695684
TAdaptiveLock Lock;
696685

697686
// limits (guarded by Lock)
698-
TLimitedResource<ui32> ExecutionUnitsResource;
687+
std::atomic<i32> ExecutionUnitsResource;
688+
std::atomic<i32> ExecutionUnitsLimit;
699689
TLimitedResource<ui64> ScanQueryMemoryResource;
700690
ui64 ExternalDataQueryMemory = 0;
701691

@@ -974,7 +964,9 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
974964
LOG_I("Updated table service config: " << config.DebugString());
975965

976966
with_lock (ResourceManager->Lock) {
977-
ResourceManager->ExecutionUnitsResource.SetNewLimit(config.GetComputeActorsCount());
967+
i32 prev = ResourceManager->ExecutionUnitsLimit.load();
968+
ResourceManager->ExecutionUnitsLimit.store(config.GetComputeActorsCount());
969+
ResourceManager->ExecutionUnitsResource.fetch_add((i32)config.GetComputeActorsCount() - prev);
978970
ResourceManager->Config.Swap(&config);
979971
}
980972

@@ -1024,7 +1016,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
10241016
with_lock (ResourceManager->Lock) {
10251017
str << "ScanQuery memory resource: " << ResourceManager->ScanQueryMemoryResource.ToString() << Endl;
10261018
str << "External DataQuery memory: " << ResourceManager->ExternalDataQueryMemory << Endl;
1027-
str << "ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource.ToString() << Endl;
1019+
str << "ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource.load() << Endl;
10281020
}
10291021
str << "Last resource broker task id: " << ResourceManager->LastResourceBrokerTaskId.load() << Endl;
10301022
if (WbState.LastPublishTime) {
@@ -1160,11 +1152,11 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
11601152
}
11611153
ActorIdToProto(MakeKqpResourceManagerServiceID(SelfId().NodeId()), payload.MutableResourceManagerActorId()); // legacy
11621154
with_lock (ResourceManager->Lock) {
1163-
payload.SetAvailableComputeActors(ResourceManager->ExecutionUnitsResource.Available()); // legacy
1155+
payload.SetAvailableComputeActors(ResourceManager->ExecutionUnitsResource.load()); // legacy
11641156
payload.SetTotalMemory(ResourceManager->ScanQueryMemoryResource.GetLimit()); // legacy
11651157
payload.SetUsedMemory(ResourceManager->ScanQueryMemoryResource.GetLimit() - ResourceManager->ScanQueryMemoryResource.Available()); // legacy
11661158

1167-
payload.SetExecutionUnits(ResourceManager->ExecutionUnitsResource.Available());
1159+
payload.SetExecutionUnits(ResourceManager->ExecutionUnitsResource.load());
11681160
auto* pool = payload.MutableMemory()->Add();
11691161
pool->SetPool(EKqpMemoryPool::ScanQuery);
11701162
pool->SetAvailable(ResourceManager->ScanQueryMemoryResource.Available());

0 commit comments

Comments
 (0)