Skip to content

Commit ec6cbc2

Browse files
Merge d47fd48 into dddbb74
2 parents dddbb74 + d47fd48 commit ec6cbc2

File tree

12 files changed

+68
-15
lines changed

12 files changed

+68
-15
lines changed

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,7 @@ message TLimiterConfig {
634634
message TGroupedMemoryLimiterConfig {
635635
optional bool Enabled = 1 [default = true];
636636
optional uint64 MemoryLimit = 2;
637+
optional uint64 HardMemoryLimit = 3;
637638
}
638639

639640
message TExternalIndexConfig {

ydb/core/tx/columnshard/engines/reader/abstract/read_context.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class TReadContext {
5252
const TActorId ResourceSubscribeActorId;
5353
const TActorId ReadCoordinatorActorId;
5454
const TComputeShardingPolicy ComputeShardingPolicy;
55+
TAtomic AbortFlag = 0;
5556

5657
public:
5758
template <class T>
@@ -61,6 +62,13 @@ class TReadContext {
6162
return result;
6263
}
6364

65+
void AbortWithError(const TString& errorMessage) {
66+
if (AtomicCas(&AbortFlag, 1, 0)) {
67+
NActors::TActivationContext::Send(
68+
ScanActorId, std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail(errorMessage)));
69+
}
70+
}
71+
6472
bool IsReverse() const {
6573
return ReadMetadata->IsDescSorted();
6674
}

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(
189189
, TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) {
190190
}
191191

192+
void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
193+
auto sourcePtr = Source.lock();
194+
if (sourcePtr) {
195+
sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
196+
"cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
197+
}
198+
}
199+
192200
TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(
193201
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
194202

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ class TAllocateMemoryStep: public IFetchingStep {
237237
NColumnShard::TCounterGuard TasksGuard;
238238
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
239239
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
240-
240+
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
241241
public:
242242
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step);
243243
};

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ class TBaseMergeTask: public IDataTasksProcessor::ITask, public NGroupedMemoryMa
7777
virtual bool DoApply(IDataReader& indexedDataRead) const override;
7878
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
7979
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
80+
virtual void DoOnAllocationImpossible(const TString& errorMessage) override {
81+
Context->GetCommonContext()->AbortWithError("cannot allocate memory for merge task: '" + errorMessage + "'");
82+
}
8083

8184
public:
8285
TBaseMergeTask(const std::shared_ptr<TMergingContext>& mergingContext, const std::shared_ptr<TSpecialReadContext>& readContext)

ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,12 @@ TStatsIterator::TFetchingAccessorAllocation::TFetchingAccessorAllocation(
195195
, AccessorsManager(context->GetDataAccessorsManager())
196196
, Request(request)
197197
, WaitingCountersGuard(context->GetCounters().GetFetcherAcessorsGuard())
198-
, OwnerId(context->GetScanActorId()) {
198+
, OwnerId(context->GetScanActorId())
199+
, Context(context) {
200+
}
201+
202+
void TStatsIterator::TFetchingAccessorAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
203+
Context->AbortWithError("cannot allocate memory for take accessors info: " + errorMessage);
199204
}
200205

201206
} // namespace NKikimr::NOlap::NReader::NSysView::NChunks

ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,15 @@ class TStatsIterator: public NAbstract::TStatsIterator<NKikimr::NSysView::Schema
109109
std::shared_ptr<TDataAccessorsRequest> Request;
110110
NColumnShard::TCounterGuard WaitingCountersGuard;
111111
const NActors::TActorId OwnerId;
112+
const std::shared_ptr<NReader::TReadContext> Context;
113+
112114
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
113115
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /*selfPtr*/) override {
114116
Guard = std::move(guard);
115117
AccessorsManager->AskData(std::move(Request));
116118
return true;
117119
}
120+
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
118121

119122
virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override {
120123
if (result.HasErrors()) {

ydb/core/tx/limiter/grouped_memory/service/allocation.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,16 @@ class TAllocationInfo: public NColumnShard::TMonitoringObjectsCounter<TAllocatio
4848
AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName());
4949
AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())(
5050
"allocation_internal_group_id", AllocationInternalGroupId);
51+
auto allocationResult = Stage->Allocate(AllocatedVolume);
52+
if (allocationResult.IsFail()) {
53+
AllocationFailed = true;
54+
Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage());
55+
return false;
56+
}
5157
const bool result = Allocation->OnAllocated(
5258
std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation);
53-
if (result) {
54-
Stage->Allocate(AllocatedVolume);
55-
} else {
56-
Stage->Free(AllocatedVolume, false);
59+
if (!result) {
60+
Stage->Free(AllocatedVolume, true);
5761
AllocationFailed = true;
5862
}
5963
Allocation = nullptr;

ydb/core/tx/limiter/grouped_memory/usage/abstract.h

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/library/actors/core/actor.h>
66
#include <ydb/library/actors/core/actorid.h>
77
#include <ydb/library/actors/core/log.h>
8+
#include <ydb/library/conclusion/status.h>
89

910
namespace NKikimr::NOlap::NGroupedMemoryManager {
1011

@@ -95,6 +96,7 @@ class TStageFeatures {
9596
private:
9697
YDB_READONLY_DEF(TString, Name);
9798
YDB_READONLY(ui64, Limit, 0);
99+
YDB_READONLY(ui64, HardLimit, 0);
98100
YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage);
99101
YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting);
100102
std::shared_ptr<TStageFeatures> Owner;
@@ -114,24 +116,33 @@ class TStageFeatures {
114116
return Usage.Val() + Waiting.Val();
115117
}
116118

117-
TStageFeatures(
118-
const TString& name, const ui64 limit, const std::shared_ptr<TStageFeatures>& owner, const std::shared_ptr<TStageCounters>& counters)
119+
TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner,
120+
const std::shared_ptr<TStageCounters>& counters)
119121
: Name(name)
120122
, Limit(limit)
123+
, HardLimit(hardLimit)
121124
, Owner(owner)
122125
, Counters(counters) {
123126
}
124127

125-
void Allocate(const ui64 volume) {
128+
[[nodiscard]] TConclusionStatus Allocate(const ui64 volume) {
129+
if (HardLimit < Usage.Val() + volume) {
130+
return TConclusionStatus::Fail(TStringBuilder() << "limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ";");
131+
}
126132
Waiting.Sub(volume);
127133
Usage.Add(volume);
128134
if (Counters) {
129135
Counters->Add(volume, true);
130136
Counters->Sub(volume, false);
131137
}
132138
if (Owner) {
133-
Owner->Allocate(volume);
139+
const auto ownerResult = Owner->Allocate(volume);
140+
if (ownerResult.IsFail()) {
141+
Free(volume, true);
142+
return ownerResult;
143+
}
134144
}
145+
return TConclusionStatus::Success();
135146
}
136147

137148
void Free(const ui64 volume, const bool allocated) {
@@ -199,6 +210,7 @@ class IAllocation {
199210
YDB_READONLY(ui64, Identifier, Counter.Inc());
200211
YDB_READONLY(ui64, Memory, 0);
201212
bool Allocated = false;
213+
virtual void DoOnAllocationImpossible(const TString& errorMessage) = 0;
202214
virtual bool DoOnAllocated(
203215
std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) = 0;
204216

@@ -216,6 +228,10 @@ class IAllocation {
216228
return Allocated;
217229
}
218230

231+
void OnAllocationImpossible(const TString& errorMessage) {
232+
DoOnAllocationImpossible(errorMessage);
233+
}
234+
219235
[[nodiscard]] bool OnAllocated(
220236
std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation);
221237
};

ydb/core/tx/limiter/grouped_memory/usage/config.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TGroupedMemoryLimiterCon
77
if (config.HasMemoryLimit()) {
88
MemoryLimit = config.GetMemoryLimit();
99
}
10+
if (config.HasHardMemoryLimit()) {
11+
HardMemoryLimit = config.GetHardMemoryLimit();
12+
}
1013
Enabled = config.GetEnabled();
1114
return true;
1215
}
1316

1417
TString TConfig::DebugString() const {
1518
TStringBuilder sb;
16-
sb << "MemoryLimit=" << MemoryLimit << ";Enabled=" << Enabled << ";";
19+
sb << "MemoryLimit=" << MemoryLimit << ";HardMemoryLimit=" << HardMemoryLimit << ";Enabled=" << Enabled << ";";
1720
return sb;
1821
}
1922

0 commit comments

Comments
 (0)