Skip to content

Commit 899ccd7

Browse files
Overlimit read memory (#7408)
1 parent d4149ad commit 899ccd7

File tree

9 files changed

+122
-22
lines changed

9 files changed

+122
-22
lines changed

ydb/core/tx/columnshard/columnshard__write.cpp

+13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#include "columnshard_impl.h"
2+
#include "common/limits.h"
23
#include "blobs_action/transaction/tx_write.h"
34
#include "blobs_action/transaction/tx_draft.h"
45
#include "counters/columnshard.h"
6+
#include "engines/column_engine_logs.h"
57
#include "operations/batch_builder/builder.h"
68
#include "operations/write_data.h"
79

@@ -193,6 +195,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
193195
return returnFail(COUNTER_WRITE_FAIL);
194196
}
195197

198+
const ui64 minMemoryRead = TablesManager.GetPrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>()
199+
.GetGranuleVerified(writeMeta.GetTableId())
200+
.GetPortionsIndex()
201+
.GetMinMemoryRead();
202+
if (NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit < minMemoryRead) {
203+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "overlimit")("reason", "read_memory")("current", minMemoryRead)(
204+
"limit", NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit)("table_id", writeMeta.GetTableId());
205+
Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::OverlimitReadMemory);
206+
return returnFail(COUNTER_WRITE_FAIL);
207+
}
208+
196209
const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
197210
auto arrowData = std::make_shared<TProtoArrowData>(snapshotSchema);
198211
if (!arrowData->ParseFromProto(record)) {

ydb/core/tx/columnshard/common/limits.h

+4
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,9 @@ class TGlobalLimits {
99
static constexpr inline ui64 InsertCompactionMemoryLimit = 1ULL << 30;
1010
static constexpr inline ui64 GeneralCompactionMemoryLimit = 3ULL << 30;
1111
static constexpr inline ui64 ScanMemoryLimit = 3ULL << 30;
12+
13+
static constexpr inline ui64 DefaultRejectMemoryIntervalLimit = ScanMemoryLimit;
14+
static constexpr inline ui64 DefaultReduceMemoryIntervalLimit = 0.8 * ScanMemoryLimit;
15+
static constexpr inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20;
1216
};
1317
}

ydb/core/tx/columnshard/common/ya.make

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
LIBRARY()
22

33
SRCS(
4-
limits.h
4+
limits.cpp
55
reverse_accessor.cpp
66
scalars.cpp
77
snapshot.cpp

ydb/core/tx/columnshard/counters/columnshard.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ enum class EWriteFailReason {
1414
LongTxDuplication /* "long_tx_duplication" */,
1515
NoTable /* "no_table" */,
1616
IncorrectSchema /* "incorrect_schema" */,
17-
Overload /* "overload" */
17+
Overload /* "overload" */,
18+
OverlimitReadMemory /* "overlimit_read_memory" */
1819
};
1920

2021
class TCSInitialization: public TCommonCountersOwner {

ydb/core/tx/columnshard/counters/engine_logs.h

+37-4
Original file line numberDiff line numberDiff line change
@@ -85,17 +85,32 @@ class TAgentDataClassCounters: public TCommonCountersOwner {
8585
}
8686
};
8787

88+
class TPortionsIndexCounters {
89+
public:
90+
const std::shared_ptr<TValueAggregationClient> MinReadBytes;
91+
TPortionsIndexCounters(const std::shared_ptr<TValueAggregationClient>& minReadBytes)
92+
: MinReadBytes(minReadBytes) {
93+
}
94+
};
95+
8896
class TGranuleDataCounters {
8997
private:
9098
const TDataClassCounters InsertedData;
9199
const TDataClassCounters CompactedData;
92100
const TDataClassCounters FullData;
101+
const TPortionsIndexCounters PortionsIndexCounters;
102+
93103
public:
94-
TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData)
104+
const TPortionsIndexCounters& GetPortionsIndexCounters() const {
105+
return PortionsIndexCounters;
106+
}
107+
108+
TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData,
109+
const std::shared_ptr<TValueAggregationClient>& minReadBytes)
95110
: InsertedData(insertedData)
96111
, CompactedData(compactedData)
97112
, FullData(fullData)
98-
{
113+
, PortionsIndexCounters(minReadBytes) {
99114
}
100115

101116
void OnPortionsDataRefresh(const TBaseGranuleDataClassSummary& inserted, const TBaseGranuleDataClassSummary& compacted) const {
@@ -105,20 +120,38 @@ class TGranuleDataCounters {
105120
}
106121
};
107122

123+
class TPortionsIndexAgentsCounters: public TCommonCountersOwner {
124+
private:
125+
using TBase = TCommonCountersOwner;
126+
127+
public:
128+
const std::shared_ptr<TValueAggregationAgent> MinReadBytes;
129+
130+
TPortionsIndexAgentsCounters(const TString& baseName)
131+
: TBase(baseName)
132+
, MinReadBytes(TBase::GetValueAutoAggregations("MinRead/Bytes")) {
133+
}
134+
};
135+
108136
class TAgentGranuleDataCounters {
109137
private:
110138
TAgentDataClassCounters InsertedData;
111139
TAgentDataClassCounters CompactedData;
112140
TAgentDataClassCounters FullData;
141+
TPortionsIndexAgentsCounters PortionsIndex;
142+
113143
public:
114144
TAgentGranuleDataCounters(const TString& ownerId)
115145
: InsertedData(ownerId, "ByGranule/Inserted")
116146
, CompactedData(ownerId, "ByGranule/Compacted")
117-
, FullData(ownerId, "ByGranule/Full") {
147+
, FullData(ownerId, "ByGranule/Full")
148+
, PortionsIndex("ByGranule/PortionsIndex")
149+
{
118150
}
119151

120152
TGranuleDataCounters RegisterClient() const {
121-
return TGranuleDataCounters(InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient());
153+
return TGranuleDataCounters(
154+
InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient(), PortionsIndex.MinReadBytes->GetClient());
122155
}
123156
};
124157

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22
#include "columns_set.h"
33
#include "fetching.h"
4+
#include <ydb/core/tx/columnshard/common/limits.h>
45
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
56
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h>
67
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
@@ -32,9 +33,9 @@ class TSpecialReadContext {
3233
std::array<std::array<std::array<std::array<std::array<std::array<std::shared_ptr<TFetchingScript>, 2>, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts;
3334

3435
public:
35-
static const inline ui64 DefaultRejectMemoryIntervalLimit = ((ui64)3) << 30;
36-
static const inline ui64 DefaultReduceMemoryIntervalLimit = DefaultRejectMemoryIntervalLimit;
37-
static const inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20;
36+
static const inline ui64 DefaultRejectMemoryIntervalLimit = TGlobalLimits::DefaultRejectMemoryIntervalLimit;
37+
static const inline ui64 DefaultReduceMemoryIntervalLimit = TGlobalLimits::DefaultReduceMemoryIntervalLimit;
38+
static const inline ui64 DefaultReadSequentiallyBufferSize = TGlobalLimits::DefaultReadSequentiallyBufferSize;
3839

3940
const ui64 ReduceMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetReduceMemoryIntervalLimit(DefaultReduceMemoryIntervalLimit);
4041
const ui64 RejectMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetRejectMemoryIntervalLimit(DefaultRejectMemoryIntervalLimit);

ydb/core/tx/columnshard/engines/storage/granule/granule.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,7 @@ TGranuleMeta::TGranuleMeta(const ui64 pathId, const TGranulesStorage& owner, con
138138
, PortionInfoGuard(owner.GetCounters().BuildPortionBlobsGuard())
139139
, Stats(owner.GetStats())
140140
, StoragesManager(owner.GetStoragesManager())
141-
, PortionsIndex(*this)
142-
{
141+
, PortionsIndex(*this, Counters.GetPortionsIndexCounters()) {
143142
NStorageOptimizer::IOptimizerPlannerConstructor::TBuildContext context(PathId, owner.GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetPrimaryKey());
144143
OptimizerPlanner = versionedIndex.GetLastSchema()->GetIndexInfo().GetCompactionPlannerConstructor()->BuildPlanner(context).DetachResult();
145144
AFL_VERIFY(!!OptimizerPlanner);

ydb/core/tx/columnshard/engines/storage/granule/portions_index.cpp

+22-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ TPortionsIndex::TPortionIntervals TPortionsIndex::GetIntervalFeatures(const TPor
1111
TPortionIntervals portionExcludeIntervals;
1212
while (true) {
1313
std::optional<NArrow::TReplaceKey> nextKey;
14-
for (auto&& p : itFrom->second.GetPortionIds()) {
14+
for (auto&& [p, _] : itFrom->second.GetPortionIds()) {
1515
if (skipPortions.contains(p)) {
1616
continue;
1717
}
@@ -55,9 +55,12 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
5555
auto itTo = Points.find(p->IndexKeyEnd());
5656
AFL_VERIFY(itTo != Points.end());
5757
{
58+
const ui64 minMemoryRead = p->GetMinMemoryForReadColumns({});
5859
auto it = itFrom;
5960
while (true) {
60-
it->second.RemoveContained(p->GetPortionId());
61+
RemoveFromMemoryUsageControl(it->second.GetMinMemoryRead());
62+
it->second.RemoveContained(p->GetPortionId(), minMemoryRead);
63+
++CountMemoryUsages[it->second.GetMinMemoryRead()];
6164
if (it == itTo) {
6265
break;
6366
}
@@ -67,19 +70,27 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
6770
if (itFrom != itTo) {
6871
itFrom->second.RemoveStart(p);
6972
if (itFrom->second.IsEmpty()) {
73+
RemoveFromMemoryUsageControl(itFrom->second.GetMinMemoryRead());
7074
Points.erase(itFrom);
7175
}
7276
itTo->second.RemoveFinish(p);
7377
if (itTo->second.IsEmpty()) {
78+
RemoveFromMemoryUsageControl(itTo->second.GetMinMemoryRead());
7479
Points.erase(itTo);
7580
}
7681
} else {
7782
itTo->second.RemoveStart(p);
7883
itTo->second.RemoveFinish(p);
7984
if (itTo->second.IsEmpty()) {
85+
RemoveFromMemoryUsageControl(itTo->second.GetMinMemoryRead());
8086
Points.erase(itTo);
8187
}
8288
}
89+
if (CountMemoryUsages.size()) {
90+
Counters.MinReadBytes->SetValue(CountMemoryUsages.rbegin()->first);
91+
} else {
92+
Counters.MinReadBytes->SetValue(0);
93+
}
8394
}
8495

8596
void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
@@ -89,13 +100,21 @@ void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
89100
itTo->second.AddFinish(p);
90101

91102
auto it = itFrom;
103+
const ui64 minMemoryRead = p->GetMinMemoryForReadColumns({});
92104
while (true) {
93-
it->second.AddContained(p->GetPortionId());
105+
RemoveFromMemoryUsageControl(it->second.GetMinMemoryRead());
106+
it->second.AddContained(p->GetPortionId(), minMemoryRead);
107+
++CountMemoryUsages[it->second.GetMinMemoryRead()];
94108
if (it == itTo) {
95109
break;
96110
}
97111
AFL_VERIFY(++it != Points.end());
98112
}
113+
if (CountMemoryUsages.size()) {
114+
Counters.MinReadBytes->SetValue(CountMemoryUsages.rbegin()->first);
115+
} else {
116+
Counters.MinReadBytes->SetValue(0);
117+
}
99118
}
100119

101120
}

ydb/core/tx/columnshard/engines/storage/granule/portions_index.h

+38-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#pragma once
2+
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
23
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
34

45
namespace NKikimr::NOlap {
@@ -11,35 +12,44 @@ class TPortionsPKPoint {
1112
private:
1213
THashMap<ui64, std::shared_ptr<TPortionInfo>> Start;
1314
THashMap<ui64, std::shared_ptr<TPortionInfo>> Finish;
14-
THashSet<ui64> PortionIds;
15+
THashMap<ui64, ui64> PortionIds;
16+
YDB_READONLY(ui64, MinMemoryRead, 0);
17+
1518
public:
1619
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetStart() const {
1720
return Start;
1821
}
1922

2023
void ProvidePortions(const TPortionsPKPoint& source) {
21-
for (auto&& i : source.PortionIds) {
24+
MinMemoryRead = 0;
25+
for (auto&& [i, mem] : source.PortionIds) {
2226
if (source.Finish.contains(i)) {
2327
continue;
2428
}
25-
AFL_VERIFY(PortionIds.emplace(i).second);
29+
AddContained(i, mem);
2630
}
2731
}
2832

29-
const THashSet<ui64>& GetPortionIds() const {
33+
const THashMap<ui64, ui64>& GetPortionIds() const {
3034
return PortionIds;
3135
}
3236

3337
bool IsEmpty() const {
3438
return Start.empty() && Finish.empty();
3539
}
3640

37-
void AddContained(const ui64 portionId) {
38-
AFL_VERIFY(PortionIds.emplace(portionId).second);
41+
void AddContained(const ui32 portionId, const ui64 minMemoryRead) {
42+
MinMemoryRead += minMemoryRead;
43+
AFL_VERIFY(PortionIds.emplace(portionId, minMemoryRead).second);
3944
}
4045

41-
void RemoveContained(const ui64 portionId) {
46+
void RemoveContained(const ui32 portionId, const ui64 minMemoryRead) {
47+
AFL_VERIFY(minMemoryRead <= MinMemoryRead);
48+
MinMemoryRead -= minMemoryRead;
4249
AFL_VERIFY(PortionIds.erase(portionId));
50+
if (PortionIds.empty()) {
51+
AFL_VERIFY(!MinMemoryRead);
52+
}
4353
}
4454

4555
void RemoveStart(const std::shared_ptr<TPortionInfo>& p) {
@@ -64,7 +74,9 @@ class TPortionsPKPoint {
6474
class TPortionsIndex {
6575
private:
6676
std::map<NArrow::TReplaceKey, TPortionsPKPoint> Points;
77+
std::map<ui64, i32> CountMemoryUsages;
6778
const TGranuleMeta& Owner;
79+
const NColumnShard::TPortionsIndexCounters& Counters;
6880

6981
std::map<NArrow::TReplaceKey, TPortionsPKPoint>::iterator InsertPoint(const NArrow::TReplaceKey& key) {
7082
auto it = Points.find(key);
@@ -75,17 +87,35 @@ class TPortionsIndex {
7587
--itPred;
7688
it->second.ProvidePortions(itPred->second);
7789
}
90+
++CountMemoryUsages[it->second.GetMinMemoryRead()];
7891
}
7992
return it;
8093
}
8194

95+
void RemoveFromMemoryUsageControl(const ui64 mem) {
96+
auto it = CountMemoryUsages.find(mem);
97+
AFL_VERIFY(it != CountMemoryUsages.end())("mem", mem);
98+
if (!--it->second) {
99+
CountMemoryUsages.erase(it);
100+
}
101+
}
102+
82103
public:
83-
TPortionsIndex(const TGranuleMeta& owner)
104+
TPortionsIndex(const TGranuleMeta& owner, const NColumnShard::TPortionsIndexCounters& counters)
84105
: Owner(owner)
106+
, Counters(counters)
85107
{
86108

87109
}
88110

111+
ui64 GetMinMemoryRead() const {
112+
if (CountMemoryUsages.empty()) {
113+
return 0;
114+
} else {
115+
return CountMemoryUsages.rbegin()->second;
116+
}
117+
}
118+
89119
const std::map<NArrow::TReplaceKey, TPortionsPKPoint>& GetPoints() const {
90120
return Points;
91121
}

0 commit comments

Comments
 (0)