Skip to content

Commit e8239f9

Browse files
committed
continue refactoring
1 parent 9d5d4db commit e8239f9

File tree

1 file changed

+27
-57
lines changed

1 file changed

+27
-57
lines changed

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 27 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,23 @@ struct TTxState {
108108
TInstant CreatedAt;
109109

110110
bool IsDataQuery = false;
111+
112+
TTaskState& Allocated(ui64 taskId, TInstant now, const TKqpResourcesRequest& resources) {
113+
114+
TxScanQueryMemory += resources.Memory;
115+
if (!CreatedAt) {
116+
CreatedAt = now;
117+
}
118+
119+
auto& taskState = Tasks[taskId];
120+
taskState.ExecutionUnits += resources.ExecutionUnits;
121+
taskState.ScanQueryMemory += resources.Memory;
122+
if (!taskState.CreatedAt) {
123+
taskState.CreatedAt = now;
124+
}
125+
126+
return taskState;
127+
}
111128
};
112129

113130
struct TTxStatesBucket {
@@ -259,16 +276,19 @@ class TKqpResourceManager : public IKqpResourceManager {
259276

260277
auto& txBucket = TxBucket(txId);
261278
with_lock (txBucket.Lock) {
262-
if (auto it = txBucket.Txs.find(txId); it != txBucket.Txs.end()) {
263-
ui64 txTotalRequestedMemory = it->second.TxScanQueryMemory + resources.Memory;
264-
if (txTotalRequestedMemory > queryMemoryLimit) {
279+
Y_DEFER {
280+
if (!result) {
265281
auto unguard = ::Unguard(txBucket.Lock);
266-
282+
Counters->RmNotEnoughMemory->Inc();
267283
with_lock (Lock) {
268284
ScanQueryMemoryResource.Release(resources.Memory);
269285
} // with_lock (Lock)
286+
}
287+
};
270288

271-
Counters->RmNotEnoughMemory->Inc();
289+
if (auto it = txBucket.Txs.find(txId); it != txBucket.Txs.end()) {
290+
ui64 txTotalRequestedMemory = it->second.TxScanQueryMemory + resources.Memory;
291+
if (txTotalRequestedMemory > queryMemoryLimit) {
272292
TStringBuilder reason;
273293
reason << "TxId: " << txId << ", taskId: " << taskId << ". Query memory limit exceeded: "
274294
<< "requested " << txTotalRequestedMemory;
@@ -282,13 +302,6 @@ class TKqpResourceManager : public IKqpResourceManager {
282302
SelfId);
283303

284304
if (!allocated) {
285-
auto unguard = ::Unguard(txBucket.Lock);
286-
287-
with_lock (Lock) {
288-
ScanQueryMemoryResource.Release(resources.Memory);
289-
} // with_lock (Lock)
290-
291-
Counters->RmNotEnoughMemory->Inc();
292305
TStringBuilder reason;
293306
reason << "TxId: " << txId << ", taskId: " << taskId << ". Not enough ScanQueryMemory: "
294307
<< "requested " << resources.Memory;
@@ -297,20 +310,7 @@ class TKqpResourceManager : public IKqpResourceManager {
297310
return result;
298311
}
299312

300-
auto& txState = txBucket.Txs[txId];
301-
302-
txState.TxScanQueryMemory += resources.Memory;
303-
if (!txState.CreatedAt) {
304-
txState.CreatedAt = now;
305-
}
306-
307-
auto& taskState = txState.Tasks[taskId];
308-
taskState.ExecutionUnits += resources.ExecutionUnits;
309-
taskState.ScanQueryMemory += resources.Memory;
310-
if (!taskState.CreatedAt) {
311-
taskState.CreatedAt = now;
312-
}
313-
313+
auto& taskState = txBucket.Txs[txId].Allocated(Counters, taskId, now, resources);
314314
if (!taskState.ResourceBrokerTaskId) {
315315
taskState.ResourceBrokerTaskId = rbTaskId;
316316
} else {
@@ -393,8 +393,7 @@ class TKqpResourceManager : public IKqpResourceManager {
393393
}
394394

395395
if (taskIt->second.ExecutionUnits) {
396-
FreeExecutionUnits(taskIt->second.ExecutionUnits);
397-
taskIt->second.ExecutionUnits = 0;
396+
FreeExecutionUnits(std::exchange(taskIt->second.ExecutionUnits, 0));
398397
}
399398

400399
if (txIt->second.IsDataQuery) {
@@ -542,35 +541,6 @@ class TKqpResourceManager : public IKqpResourceManager {
542541
FireResourcesPublishing();
543542
}
544543

545-
void NotifyExternalResourcesFreed(ui64 txId) {
546-
LOG_AS_D("TxId: " << txId << ". External free.");
547-
548-
ui64 releaseMemory = 0;
549-
550-
auto& txBucket = TxBucket(txId);
551-
with_lock (txBucket.Lock) {
552-
auto txIt = txBucket.Txs.find(txId);
553-
if (txIt == txBucket.Txs.end()) {
554-
return;
555-
}
556-
557-
for (auto task : txIt->second.Tasks) {
558-
releaseMemory += task.second.ExternalDataQueryMemory;
559-
}
560-
txBucket.Txs.erase(txId);
561-
} // with_lock (txBucket.Lock)
562-
563-
with_lock (Lock) {
564-
Y_DEBUG_ABORT_UNLESS(ExternalDataQueryMemory >= releaseMemory);
565-
ExternalDataQueryMemory -= releaseMemory;
566-
} // with_lock (Lock)
567-
568-
Counters->RmExternalMemory->Sub(releaseMemory);
569-
Y_DEBUG_ABORT_UNLESS(Counters->RmExternalMemory->Val() >= 0);
570-
571-
FireResourcesPublishing();
572-
}
573-
574544
void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
575545
LOG_AS_D("Schedule Snapshot request");
576546
if (PublishResourcesByExchanger) {

0 commit comments

Comments
 (0)