Skip to content

Commit ec4cf5a

Browse files
Merge a4a82ed into 7cab7c7
2 parents 7cab7c7 + a4a82ed commit ec4cf5a

File tree

3 files changed

+26
-1
lines changed

3 files changed

+26
-1
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

22
#pragma once
33

4+
#include <util/system/mem_info.h>
45
#include <ydb/library/services/services.pb.h>
56
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
67
#include <ydb/library/yql/minikql/mkql_alloc.h>
@@ -55,6 +56,21 @@ namespace NYql::NDq {
5556
}
5657
}
5758

59+
// This callback is created for testing purposes and will be enabled only with spilling.
60+
// Most likely this callback will be removed after KIKIMR-21481.
61+
void TrySetIncreaseMemoryLimitCallbackWithRSSControl(NKikimr::NMiniKQL::TScopedAlloc* alloc) {
62+
if (CanAllocateExtraMemory) {
63+
alloc->Ref().SetIncreaseMemoryLimitCallback([this, alloc](ui64 limit, ui64 required) {
64+
RequestExtraMemory(required - limit, alloc);
65+
const ui64 RSSLimit = std::numeric_limits<ui64>::max();
66+
auto currentRSS = NMemInfo::GetMemInfo().RSS;
67+
if (currentRSS > RSSLimit) {
68+
alloc->SetMaximumLimitValueReached(true);
69+
}
70+
});
71+
}
72+
}
73+
5874
void TryShrinkMemory(NKikimr::NMiniKQL::TScopedAlloc* alloc) {
5975
if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) {
6076
alloc->ReleaseFreePages();
@@ -113,6 +129,7 @@ namespace NYql::NDq {
113129

114130
private:
115131
void RequestExtraMemory(ui64 memory, NKikimr::NMiniKQL::TScopedAlloc* alloc) {
132+
alloc->GetAllocated();
116133
memory = std::max(AlignMemorySizeToMbBoundary(memory), MemoryLimits.MinMemAllocSize);
117134

118135
if (MemoryLimits.MkqlProgramHardMemoryLimit && MkqlMemoryLimit + memory > MemoryLimits.MkqlProgramHardMemoryLimit) {

ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,11 @@ class TLocalTaskRunnerActor
423423

424424
auto guard = TaskRunner->BindAllocator(MemoryQuota ? TMaybe<ui64>(MemoryQuota->GetMkqlMemoryLimit()) : Nothing());
425425
if (MemoryQuota) {
426-
MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex());
426+
if (settings.GetEnableSpilling()) {
427+
MemoryQuota->TrySetIncreaseMemoryLimitCallbackWithRSSControl(guard.GetMutex());
428+
} else {
429+
MemoryQuota->TrySetIncreaseMemoryLimitCallback(guard.GetMutex());
430+
}
427431
}
428432

429433
TaskRunner->Prepare(settings, ev->Get()->MemoryLimits, *ev->Get()->ExecCtx);

ydb/library/yql/minikql/mkql_alloc.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ class TScopedAlloc {
213213
Release();
214214
}
215215

216+
void SetMaximumLimitValueReached(bool IsReached) {
217+
MyState_.SetMaximumLimitValueReached(IsReached);
218+
}
219+
216220
private:
217221
const bool InitiallyAcquired_;
218222
TAllocState MyState_;

0 commit comments

Comments
 (0)