Skip to content

Commit dd5acad

Browse files
Merge f92aa20 into 2ed4009
2 parents 2ed4009 + f92aa20 commit dd5acad

File tree

3 files changed

+28
-1
lines changed

3 files changed

+28
-1
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

22
#pragma once
33

4+
#include <util/system/mem_info.h>
45
#include <ydb/library/services/services.pb.h>
56
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
67
#include <ydb/library/yql/minikql/mkql_alloc.h>
@@ -55,6 +56,23 @@ namespace NYql::NDq {
5556
}
5657
}
5758

59+
// This callback is created for testing purposes and will be enabled only with spilling.
60+
// Most likely this callback will be removed after KIKIMR-21481.
61+
void TrySetIncreaseMemoryLimitCallbackWithRSSControl(NKikimr::NMiniKQL::TScopedAlloc* alloc) {
62+
if (!CanAllocateExtraMemory) return;
63+
const ui64 limitRSS = std::numeric_limits<ui64>::max();
64+
const ui64 criticalRSSValue = limitRSS / 100 * 80;
65+
66+
alloc->Ref().SetIncreaseMemoryLimitCallback([this, alloc](ui64 limit, ui64 required) {
67+
RequestExtraMemory(required - limit, alloc);
68+
69+
ui64 currentRSS = NMemInfo::GetMemInfo().RSS;
70+
if (currentRSS > criticalRSSValue) {
71+
alloc->SetMaximumLimitValueReached(true);
72+
}
73+
});
74+
}
75+
5876
void TryShrinkMemory(NKikimr::NMiniKQL::TScopedAlloc* alloc) {
5977
if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) {
6078
alloc->ReleaseFreePages();
@@ -113,6 +131,7 @@ namespace NYql::NDq {
113131

114132
private:
115133
void RequestExtraMemory(ui64 memory, NKikimr::NMiniKQL::TScopedAlloc* alloc) {
134+
alloc->GetAllocated();
116135
memory = std::max(AlignMemorySizeToMbBoundary(memory), MemoryLimits.MinMemAllocSize);
117136

118137
if (MemoryLimits.MkqlProgramHardMemoryLimit && MkqlMemoryLimit + memory > MemoryLimits.MkqlProgramHardMemoryLimit) {

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,11 @@ class TLocalTaskRunnerActor
423423

424424
auto guard = TaskRunner->BindAllocator(MemoryQuota ? TMaybe<ui64>(MemoryQuota->GetMkqlMemoryLimit()) : Nothing());
425425
if (MemoryQuota) {
426-
MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex());
426+
if (settings.GetEnableSpilling()) {
427+
MemoryQuota->TrySetIncreaseMemoryLimitCallbackWithRSSControl(guard.GetMutex());
428+
} else {
429+
MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex());
430+
}
427431
}
428432

429433
TaskRunner->Prepare(settings, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx);

ydb/library/yql/minikql/mkql_alloc.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ class TScopedAlloc {
213213
Release();
214214
}
215215

216+
void SetMaximumLimitValueReached(bool IsReached) {
217+
MyState_.SetMaximumLimitValueReached(IsReached);
218+
}
219+
216220
private:
217221
const bool InitiallyAcquired_;
218222
TAllocState MyState_;

0 commit comments

Comments
 (0)