Skip to content

Commit f7f05e9

Browse files
Merge 1a531c9 into ae4e8d5
2 parents ae4e8d5 + 1a531c9 commit f7f05e9

File tree

3 files changed

+27
-1
lines changed

3 files changed

+27
-1
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

22
#pragma once
33

4+
#include <util/system/mem_info.h>
45
#include <ydb/library/services/services.pb.h>
56
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
67
#include <ydb/library/yql/minikql/mkql_alloc.h>
@@ -55,6 +56,23 @@ namespace NYql::NDq {
5556
}
5657
}
5758

59+
// This callback is created for testing purposes and will be enabled only with spilling.
60+
// Most likely this callback will be removed after KIKIMR-21481.
61+
void TrySetIncreaseMemoryLimitCallbackWithRSSControl(NKikimr::NMiniKQL::TScopedAlloc* alloc) {
62+
if (!CanAllocateExtraMemory) return;
63+
const ui64 limitRSS = std::numeric_limits<ui64>::max();
64+
const ui64 criticalRSSValue = limitRSS / 100 * 80;
65+
66+
alloc->Ref().SetIncreaseMemoryLimitCallback([this, alloc](ui64 limit, ui64 required) {
67+
RequestExtraMemory(required - limit, alloc);
68+
69+
ui64 currentRSS = NMemInfo::GetMemInfo().RSS;
70+
if (currentRSS > criticalRSSValue) {
71+
alloc->SetMaximumLimitValueReached(true);
72+
}
73+
});
74+
}
75+
5876
void TryShrinkMemory(NKikimr::NMiniKQL::TScopedAlloc* alloc) {
5977
if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) {
6078
alloc->ReleaseFreePages();

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,11 @@ class TLocalTaskRunnerActor
423423

424424
auto guard = TaskRunner->BindAllocator(MemoryQuota ? TMaybe<ui64>(MemoryQuota->GetMkqlMemoryLimit()) : Nothing());
425425
if (MemoryQuota) {
426-
MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex());
426+
if (settings.GetEnableSpilling()) {
427+
MemoryQuota->TrySetIncreaseMemoryLimitCallbackWithRSSControl(guard.GetMutex());
428+
} else {
429+
MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex());
430+
}
427431
}
428432

429433
TaskRunner->Prepare(settings, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx);

ydb/library/yql/minikql/mkql_alloc.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ class TScopedAlloc {
213213
Release();
214214
}
215215

216+
void SetMaximumLimitValueReached(bool IsReached) {
217+
MyState_.SetMaximumLimitValueReached(IsReached);
218+
}
219+
216220
private:
217221
const bool InitiallyAcquired_;
218222
TAllocState MyState_;

0 commit comments

Comments
 (0)