Skip to content

Commit 5f683e8

Browse files
committed
resource manager to control execution units
1 parent 7983705 commit 5f683e8

File tree

4 files changed

+64
-72
lines changed

4 files changed

+64
-72
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ NKqpNode::TState& GetStateBucketByTx(std::shared_ptr<TBucketArray> buckets, ui64
6161

6262
void FinishKqpTask(ui64 txId, ui64 taskId, bool success, NKqpNode::TState& bucket, std::shared_ptr<NRm::IKqpResourceManager> ResourceManager) {
6363
auto ctx = bucket.RemoveTask(txId, taskId, success);
64+
ResourceManager->FreeExecutionUnits(1);
65+
6466
if (ctx) {
6567
if (ctx->ComputeActorsNumber == 0) {
6668
ResourceManager->FreeResources(txId);
@@ -310,6 +312,15 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
310312
memoryPool = NRm::EKqpMemoryPool::Unspecified;
311313
}
312314

315+
size_t executionUnits = msg.GetTasks().size();
316+
if (!ResourceManager()->AllocateExecutionUnits(executionUnits)) {
317+
TStringBuilder error;
318+
error << "TxId: " << txId << ", NodeId: " << SelfId().NodeId() << ", not enough compute actors, requested " << msg.GetTasks().size();
319+
LOG_N(error);
320+
ReplyError(txId, request.Executer, msg, NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_EXECUTION_UNITS, error);
321+
return;
322+
}
323+
313324
ui32 requestChannels = 0;
314325
for (auto& dqTask : *msg.MutableTasks()) {
315326
auto estimation = EstimateTaskResources(dqTask, Config, msg.GetTasks().size());
@@ -348,7 +359,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
348359
allocatedTasks.reserve(msg.GetTasks().size());
349360
for (auto& task : request.InFlyTasks) {
350361
NRm::TKqpResourcesRequest resourcesRequest;
351-
resourcesRequest.ExecutionUnits = 1;
352362
resourcesRequest.MemoryPool = memoryPool;
353363

354364
// !!!!!!!!!!!!!!!!!!!!!
@@ -359,14 +369,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
359369
if (!ResourceManager()->AllocateResources(txId, task.first, resourcesRequest, &resourcesResponse)) {
360370
NKikimrKqp::TEvStartKqpTasksResponse::ENotStartedTaskReason failReason = NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR;
361371
TStringBuilder error;
362-
363-
if (resourcesResponse.ExecutionUnits()) {
364-
error << "TxId: " << txId << ", NodeId: " << SelfId().NodeId() << ", not enough compute actors, requested " << msg.GetTasks().size();
365-
LOG_N(error);
366-
367-
failReason = NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_EXECUTION_UNITS;
368-
}
369-
370372
if (resourcesResponse.ScanQueryMemory()) {
371373
error << "TxId: " << txId << ", NodeId: " << SelfId().NodeId() << ", not enough memory, requested " << task.second.Memory;
372374
LOG_N(error);
@@ -385,6 +387,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
385387
ResourceManager()->FreeResources(txId, taskId);
386388
}
387389

390+
ResourceManager()->FreeExecutionUnits(executionUnits);
391+
388392
ReplyError(txId, request.Executer, msg, failReason, error);
389393
return;
390394
}

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 27 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -174,20 +174,36 @@ class TKqpResourceManager : public IKqpResourceManager {
174174
}
175175
}
176176

177+
bool AllocateExecutionUnits(ui32 cnt) override {
178+
i32 prev = ExecutionUnitsResource.fetch_sub(cnt);
179+
if (prev < (i32)cnt) {
180+
ExecutionUnitsResource.fetch_add(cnt);
181+
}
182+
183+
return (prev >= (i32)cnt);
184+
}
185+
186+
void FreeExecutionUnits(ui32 cnt) override {
187+
if (cnt == 0) {
188+
return;
189+
}
190+
191+
ExecutionUnitsResource.fetch_add(cnt);
192+
}
193+
177194
bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
178195
TKqpNotEnoughResources* details = nullptr) override {
179196
if (resources.MemoryPool == EKqpMemoryPool::DataQuery) {
180197
NotifyExternalResourcesAllocated(txId, taskId, resources);
181198
return true;
182199
}
183200
Y_ABORT_UNLESS(resources.MemoryPool == EKqpMemoryPool::ScanQuery);
184-
if (Y_UNLIKELY(resources.Memory == 0 && resources.ExecutionUnits == 0)) {
201+
if (Y_UNLIKELY(resources.Memory == 0)) {
185202
return true;
186203
}
187204

188205
auto now = ActorSystem->Timestamp();
189206
bool hasScanQueryMemory = true;
190-
bool hasExecutionUnits = true;
191207
ui64 queryMemoryLimit = 0;
192208

193209
with_lock (Lock) {
@@ -200,11 +216,8 @@ class TKqpResourceManager : public IKqpResourceManager {
200216
}
201217

202218
hasScanQueryMemory = ScanQueryMemoryResource.Has(resources.Memory);
203-
hasExecutionUnits = ExecutionUnitsResource.Has(resources.ExecutionUnits);
204-
205-
if (hasScanQueryMemory && hasExecutionUnits) {
219+
if (hasScanQueryMemory) {
206220
ScanQueryMemoryResource.Acquire(resources.Memory);
207-
ExecutionUnitsResource.Acquire(resources.ExecutionUnits);
208221
queryMemoryLimit = Config.GetQueryMemoryLimit();
209222
}
210223
} // with_lock (Lock)
@@ -218,15 +231,6 @@ class TKqpResourceManager : public IKqpResourceManager {
218231
return false;
219232
}
220233

221-
if (!hasExecutionUnits) {
222-
Counters->RmNotEnoughComputeActors->Inc();
223-
LOG_AS_N("TxId: " << txId << ", taskId: " << taskId << ". Not enough ExecutionUnits, requested: " << resources.ExecutionUnits);
224-
if (details) {
225-
details->SetExecutionUnits();
226-
}
227-
return false;
228-
}
229-
230234
ui64 rbTaskId = LastResourceBrokerTaskId.fetch_add(1) + 1;
231235
TString rbTaskName = TStringBuilder() << "kqp-" << txId << '-' << taskId << '-' << rbTaskId;
232236
bool extraAlloc = false;
@@ -239,7 +243,6 @@ class TKqpResourceManager : public IKqpResourceManager {
239243

240244
with_lock (Lock) {
241245
ScanQueryMemoryResource.Release(resources.Memory);
242-
ExecutionUnitsResource.Release(resources.ExecutionUnits);
243246
} // with_lock (Lock)
244247

245248
Counters->RmNotEnoughMemory->Inc();
@@ -261,7 +264,6 @@ class TKqpResourceManager : public IKqpResourceManager {
261264

262265
with_lock (Lock) {
263266
ScanQueryMemoryResource.Release(resources.Memory);
264-
ExecutionUnitsResource.Release(resources.ExecutionUnits);
265267
} // with_lock (Lock)
266268

267269
Counters->RmNotEnoughMemory->Inc();
@@ -276,14 +278,12 @@ class TKqpResourceManager : public IKqpResourceManager {
276278
auto& txState = txBucket.Txs[txId];
277279

278280
txState.TxScanQueryMemory += resources.Memory;
279-
txState.TxExecutionUnits += resources.ExecutionUnits;
280281
if (!txState.CreatedAt) {
281282
txState.CreatedAt = now;
282283
}
283284

284285
auto& taskState = txState.Tasks[taskId];
285286
taskState.ScanQueryMemory += resources.Memory;
286-
taskState.ExecutionUnits += resources.ExecutionUnits;
287287
if (!taskState.CreatedAt) {
288288
taskState.CreatedAt = now;
289289
}
@@ -299,7 +299,6 @@ class TKqpResourceManager : public IKqpResourceManager {
299299

300300
LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". Allocated " << resources.ToString());
301301

302-
Counters->RmComputeActors->Add(resources.ExecutionUnits);
303302
Counters->RmMemory->Add(resources.Memory);
304303
if (extraAlloc) {
305304
Counters->RmExtraMemAllocs->Inc();
@@ -340,20 +339,16 @@ class TKqpResourceManager : public IKqpResourceManager {
340339
}
341340

342341
taskIt->second.ScanQueryMemory -= resources.Memory;
343-
taskIt->second.ExecutionUnits -= resources.ExecutionUnits;
344342

345343
bool reduced = ResourceBroker->ReduceTaskResourcesInstant(
346344
taskIt->second.ResourceBrokerTaskId, {0, resources.Memory}, SelfId);
347345
Y_DEBUG_ABORT_UNLESS(reduced);
348346

349347
txIt->second.TxScanQueryMemory -= resources.Memory;
350-
txIt->second.TxExecutionUnits -= resources.ExecutionUnits;
351348

352349
ScanQueryMemoryResource.Release(resources.Memory);
353-
ExecutionUnitsResource.Release(resources.ExecutionUnits);
354350
}
355351

356-
Counters->RmComputeActors->Sub(resources.ExecutionUnits);
357352
Counters->RmMemory->Sub(resources.Memory);
358353

359354
Y_DEBUG_ABORT_UNLESS(Counters->RmComputeActors->Val() >= 0);
@@ -364,7 +359,6 @@ class TKqpResourceManager : public IKqpResourceManager {
364359

365360
void FreeResources(ui64 txId, ui64 taskId) override {
366361
ui64 releaseScanQueryMemory = 0;
367-
ui32 releaseExecutionUnits = 0;
368362
ui32 remainsTasks = 0;
369363

370364
auto& txBucket = TxBucket(txId);
@@ -389,7 +383,6 @@ class TKqpResourceManager : public IKqpResourceManager {
389383
}
390384

391385
releaseScanQueryMemory = taskIt->second.ScanQueryMemory;
392-
releaseExecutionUnits = taskIt->second.ExecutionUnits;
393386

394387
bool finished = ResourceBroker->FinishTaskInstant(
395388
TEvResourceBroker::TEvFinishTask(taskIt->second.ResourceBrokerTaskId), SelfId);
@@ -402,20 +395,17 @@ class TKqpResourceManager : public IKqpResourceManager {
402395
} else {
403396
txIt->second.Tasks.erase(taskIt);
404397
txIt->second.TxScanQueryMemory -= releaseScanQueryMemory;
405-
txIt->second.TxExecutionUnits -= releaseExecutionUnits;
406398
}
407399
} // with_lock (txBucket.Lock)
408400

409401
with_lock (Lock) {
410402
ScanQueryMemoryResource.Release(releaseScanQueryMemory);
411-
ExecutionUnitsResource.Release(releaseExecutionUnits);
412403
} // with_lock (Lock)
413404

414405
LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". Released resources, "
415-
<< "ScanQueryMemory: " << releaseScanQueryMemory << ", ExecutionUnits: " << releaseExecutionUnits << ". "
406+
<< "ScanQueryMemory: " << releaseScanQueryMemory << ". "
416407
<< "Remains " << remainsTasks << " tasks in this tx.");
417408

418-
Counters->RmComputeActors->Sub(releaseExecutionUnits);
419409
Counters->RmMemory->Sub(releaseScanQueryMemory);
420410

421411
Y_DEBUG_ABORT_UNLESS(Counters->RmComputeActors->Val() >= 0);
@@ -426,7 +416,6 @@ class TKqpResourceManager : public IKqpResourceManager {
426416

427417
void FreeResources(ui64 txId) override {
428418
ui64 releaseScanQueryMemory = 0;
429-
ui32 releaseExecutionUnits = 0;
430419

431420
auto& txBucket = TxBucket(txId);
432421

@@ -450,21 +439,17 @@ class TKqpResourceManager : public IKqpResourceManager {
450439
}
451440

452441
releaseScanQueryMemory = txIt->second.TxScanQueryMemory;
453-
releaseExecutionUnits = txIt->second.TxExecutionUnits;
454442

455443
txBucket.Txs.erase(txIt);
456444
} // with_lock (txBucket.Lock)
457445

458446
with_lock (Lock) {
459447
ScanQueryMemoryResource.Release(releaseScanQueryMemory);
460-
ExecutionUnitsResource.Release(releaseExecutionUnits);
461448
} // with_lock (Lock)
462449

463450
LOG_AS_D("TxId: " << txId << ". Released resources, "
464-
<< "ScanQueryMemory: " << releaseScanQueryMemory << ", ExecutionUnits: " << releaseExecutionUnits << ". "
465-
<< "Tx completed.");
451+
<< "ScanQueryMemory: " << releaseScanQueryMemory << ", ExecutionUnits: " << "Tx completed.");
466452

467-
Counters->RmComputeActors->Sub(releaseExecutionUnits);
468453
Counters->RmMemory->Sub(releaseScanQueryMemory);
469454

470455
Y_DEBUG_ABORT_UNLESS(Counters->RmComputeActors->Val() >= 0);
@@ -635,7 +620,7 @@ class TKqpResourceManager : public IKqpResourceManager {
635620
result.Memory.fill(0);
636621

637622
with_lock (Lock) {
638-
result.ExecutionUnits = ExecutionUnitsResource.Available();
623+
result.ExecutionUnits = ExecutionUnitsResource.load();
639624
result.Memory[EKqpMemoryPool::ScanQuery] = ScanQueryMemoryResource.Available();
640625
}
641626

@@ -695,7 +680,7 @@ class TKqpResourceManager : public IKqpResourceManager {
695680
TAdaptiveLock Lock;
696681

697682
// limits (guarded by Lock)
698-
TLimitedResource<ui32> ExecutionUnitsResource;
683+
std::atomic<i32> ExecutionUnitsResource;
699684
TLimitedResource<ui64> ScanQueryMemoryResource;
700685
ui64 ExternalDataQueryMemory = 0;
701686

@@ -974,7 +959,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
974959
LOG_I("Updated table service config: " << config.DebugString());
975960

976961
with_lock (ResourceManager->Lock) {
977-
ResourceManager->ExecutionUnitsResource.SetNewLimit(config.GetComputeActorsCount());
962+
ResourceManager->ExecutionUnitsResource.store(config.GetComputeActorsCount());
978963
ResourceManager->Config.Swap(&config);
979964
}
980965

@@ -1024,7 +1009,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
10241009
with_lock (ResourceManager->Lock) {
10251010
str << "ScanQuery memory resource: " << ResourceManager->ScanQueryMemoryResource.ToString() << Endl;
10261011
str << "External DataQuery memory: " << ResourceManager->ExternalDataQueryMemory << Endl;
1027-
str << "ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource.ToString() << Endl;
1012+
str << "ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource.load() << Endl;
10281013
}
10291014
str << "Last resource broker task id: " << ResourceManager->LastResourceBrokerTaskId.load() << Endl;
10301015
if (WbState.LastPublishTime) {
@@ -1160,11 +1145,11 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
11601145
}
11611146
ActorIdToProto(MakeKqpResourceManagerServiceID(SelfId().NodeId()), payload.MutableResourceManagerActorId()); // legacy
11621147
with_lock (ResourceManager->Lock) {
1163-
payload.SetAvailableComputeActors(ResourceManager->ExecutionUnitsResource.Available()); // legacy
1148+
payload.SetAvailableComputeActors(ResourceManager->ExecutionUnitsResource.load()); // legacy
11641149
payload.SetTotalMemory(ResourceManager->ScanQueryMemoryResource.GetLimit()); // legacy
11651150
payload.SetUsedMemory(ResourceManager->ScanQueryMemoryResource.GetLimit() - ResourceManager->ScanQueryMemoryResource.Available()); // legacy
11661151

1167-
payload.SetExecutionUnits(ResourceManager->ExecutionUnitsResource.Available());
1152+
payload.SetExecutionUnits(ResourceManager->ExecutionUnitsResource.load());
11681153
auto* pool = payload.MutableMemory()->Add();
11691154
pool->SetPool(EKqpMemoryPool::ScanQuery);
11701155
pool->SetAvailable(ResourceManager->ScanQueryMemoryResource.Available());

ydb/core/kqp/rm_service/kqp_rm_service.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@ using TOnResourcesSnapshotCallback = std::function<void(TVector<NKikimrKqp::TKqp
3434

3535
/// resources request
3636
struct TKqpResourcesRequest {
37-
ui32 ExecutionUnits = 0;
3837
EKqpMemoryPool MemoryPool = EKqpMemoryPool::Unspecified;
3938
ui64 Memory = 0;
4039

4140
TString ToString() const {
4241
return TStringBuilder() << "TKqpResourcesRequest{ MemoryPool: " << (ui32) MemoryPool << ", Memory: " << Memory
43-
<< ", ExecutionUnits: " << ExecutionUnits << " }";
42+
<< " }";
4443
}
4544
};
4645

@@ -49,16 +48,14 @@ struct TKqpNotEnoughResources {
4948
std::bitset<32> State;
5049

5150
bool NotReady() const { return State.test(0); }
52-
bool ExecutionUnits() const { return State.test(1); }
5351
bool QueryMemoryLimit() const { return State.test(2); }
5452
bool ScanQueryMemory() const { return State.test(3); }
5553
bool DataQueryMemory() const { return State.test(4); }
5654

5755
void SetNotReady() { State.set(0); }
58-
void SetExecutionUnits() { State.set(1); }
59-
void SetQueryMemoryLimit() { State.set(2); }
60-
void SetScanQueryMemory() { State.set(3); }
61-
void SetDataQueryMemory() { State.set(4); }
56+
void SetQueryMemoryLimit() { State.set(1); }
57+
void SetScanQueryMemory() { State.set(2); }
58+
void SetDataQueryMemory() { State.set(3); }
6259
};
6360

6461
/// local resources snapshot
@@ -75,6 +72,9 @@ class IKqpResourceManager : private TNonCopyable {
7572
virtual bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
7673
TKqpNotEnoughResources* details = nullptr) = 0;
7774

75+
virtual bool AllocateExecutionUnits(ui32 cnt) = 0;
76+
virtual void FreeExecutionUnits(ui32 cnt) = 0;
77+
7878
using TResourcesAllocatedCallback = std::function<void(NActors::TActorSystem* as)>;
7979
using TNotEnoughtResourcesCallback = std::function<void(NActors::TActorSystem* as, const TString& reason, bool byTimeout)>;
8080

0 commit comments

Comments
 (0)