Skip to content

Overlimit read memory #7408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include "columnshard_impl.h"
#include "common/limits.h"
#include "blobs_action/transaction/tx_write.h"
#include "blobs_action/transaction/tx_draft.h"
#include "counters/columnshard.h"
#include "engines/column_engine_logs.h"
#include "operations/batch_builder/builder.h"
#include "operations/write_data.h"

Expand Down Expand Up @@ -193,6 +195,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
return returnFail(COUNTER_WRITE_FAIL);
}

const ui64 minMemoryRead = TablesManager.GetPrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>()
.GetGranuleVerified(writeMeta.GetTableId())
.GetPortionsIndex()
.GetMinMemoryRead();
if (NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit < minMemoryRead) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "overlimit")("reason", "read_memory")("current", minMemoryRead)(
"limit", NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit)("table_id", writeMeta.GetTableId());
Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::OverlimitReadMemory);
return returnFail(COUNTER_WRITE_FAIL);
}

const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
auto arrowData = std::make_shared<TProtoArrowData>(snapshotSchema);
if (!arrowData->ParseFromProto(record)) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/common/limits.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ class TGlobalLimits {
static constexpr inline ui64 InsertCompactionMemoryLimit = 1ULL << 30;
static constexpr inline ui64 GeneralCompactionMemoryLimit = 3ULL << 30;
static constexpr inline ui64 ScanMemoryLimit = 3ULL << 30;

static constexpr inline ui64 DefaultRejectMemoryIntervalLimit = ScanMemoryLimit;
static constexpr inline ui64 DefaultReduceMemoryIntervalLimit = 0.8 * ScanMemoryLimit;
static constexpr inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20;
};
}
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/common/ya.make
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
LIBRARY()

SRCS(
limits.h
limits.cpp
reverse_accessor.cpp
scalars.cpp
snapshot.cpp
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ enum class EWriteFailReason {
LongTxDuplication /* "long_tx_duplication" */,
NoTable /* "no_table" */,
IncorrectSchema /* "incorrect_schema" */,
Overload /* "overload" */
Overload /* "overload" */,
OverlimitReadMemory /* "overlimit_read_memory" */
};

class TCSInitialization: public TCommonCountersOwner {
Expand Down
41 changes: 37 additions & 4 deletions ydb/core/tx/columnshard/counters/engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,32 @@ class TAgentDataClassCounters: public TCommonCountersOwner {
}
};

class TPortionsIndexCounters {
public:
const std::shared_ptr<TValueAggregationClient> MinReadBytes;
TPortionsIndexCounters(const std::shared_ptr<TValueAggregationClient>& minReadBytes)
: MinReadBytes(minReadBytes) {
}
};

class TGranuleDataCounters {
private:
const TDataClassCounters InsertedData;
const TDataClassCounters CompactedData;
const TDataClassCounters FullData;
const TPortionsIndexCounters PortionsIndexCounters;

public:
TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData)
const TPortionsIndexCounters& GetPortionsIndexCounters() const {
return PortionsIndexCounters;
}

TGranuleDataCounters(const TDataClassCounters& insertedData, const TDataClassCounters& compactedData, const TDataClassCounters& fullData,
const std::shared_ptr<TValueAggregationClient>& minReadBytes)
: InsertedData(insertedData)
, CompactedData(compactedData)
, FullData(fullData)
{
, PortionsIndexCounters(minReadBytes) {
}

void OnPortionsDataRefresh(const TBaseGranuleDataClassSummary& inserted, const TBaseGranuleDataClassSummary& compacted) const {
Expand All @@ -105,20 +120,38 @@ class TGranuleDataCounters {
}
};

class TPortionsIndexAgentsCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;

public:
const std::shared_ptr<TValueAggregationAgent> MinReadBytes;

TPortionsIndexAgentsCounters(const TString& baseName)
: TBase(baseName)
, MinReadBytes(TBase::GetValueAutoAggregations("MinRead/Bytes")) {
}
};

class TAgentGranuleDataCounters {
private:
TAgentDataClassCounters InsertedData;
TAgentDataClassCounters CompactedData;
TAgentDataClassCounters FullData;
TPortionsIndexAgentsCounters PortionsIndex;

public:
TAgentGranuleDataCounters(const TString& ownerId)
: InsertedData(ownerId, "ByGranule/Inserted")
, CompactedData(ownerId, "ByGranule/Compacted")
, FullData(ownerId, "ByGranule/Full") {
, FullData(ownerId, "ByGranule/Full")
, PortionsIndex("ByGranule/PortionsIndex")
{
}

TGranuleDataCounters RegisterClient() const {
return TGranuleDataCounters(InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient());
return TGranuleDataCounters(
InsertedData.RegisterClient(), CompactedData.RegisterClient(), FullData.RegisterClient(), PortionsIndex.MinReadBytes->GetClient());
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "columns_set.h"
#include "fetching.h"
#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
Expand Down Expand Up @@ -32,9 +33,9 @@ class TSpecialReadContext {
std::array<std::array<std::array<std::array<std::array<std::array<std::shared_ptr<TFetchingScript>, 2>, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts;

public:
static const inline ui64 DefaultRejectMemoryIntervalLimit = ((ui64)3) << 30;
static const inline ui64 DefaultReduceMemoryIntervalLimit = DefaultRejectMemoryIntervalLimit;
static const inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20;
static const inline ui64 DefaultRejectMemoryIntervalLimit = TGlobalLimits::DefaultRejectMemoryIntervalLimit;
static const inline ui64 DefaultReduceMemoryIntervalLimit = TGlobalLimits::DefaultReduceMemoryIntervalLimit;
static const inline ui64 DefaultReadSequentiallyBufferSize = TGlobalLimits::DefaultReadSequentiallyBufferSize;

const ui64 ReduceMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetReduceMemoryIntervalLimit(DefaultReduceMemoryIntervalLimit);
const ui64 RejectMemoryIntervalLimit = NYDBTest::TControllers::GetColumnShardController()->GetRejectMemoryIntervalLimit(DefaultRejectMemoryIntervalLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ TGranuleMeta::TGranuleMeta(const ui64 pathId, const TGranulesStorage& owner, con
, PortionInfoGuard(owner.GetCounters().BuildPortionBlobsGuard())
, Stats(owner.GetStats())
, StoragesManager(owner.GetStoragesManager())
, PortionsIndex(*this)
{
, PortionsIndex(*this, Counters.GetPortionsIndexCounters()) {
NStorageOptimizer::IOptimizerPlannerConstructor::TBuildContext context(PathId, owner.GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetPrimaryKey());
OptimizerPlanner = versionedIndex.GetLastSchema()->GetIndexInfo().GetCompactionPlannerConstructor()->BuildPlanner(context).DetachResult();
AFL_VERIFY(!!OptimizerPlanner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ TPortionsIndex::TPortionIntervals TPortionsIndex::GetIntervalFeatures(const TPor
TPortionIntervals portionExcludeIntervals;
while (true) {
std::optional<NArrow::TReplaceKey> nextKey;
for (auto&& p : itFrom->second.GetPortionIds()) {
for (auto&& [p, _] : itFrom->second.GetPortionIds()) {
if (skipPortions.contains(p)) {
continue;
}
Expand Down Expand Up @@ -55,9 +55,12 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
auto itTo = Points.find(p->IndexKeyEnd());
AFL_VERIFY(itTo != Points.end());
{
const ui64 minMemoryRead = p->GetMinMemoryForReadColumns({});
auto it = itFrom;
while (true) {
it->second.RemoveContained(p->GetPortionId());
RemoveFromMemoryUsageControl(it->second.GetMinMemoryRead());
it->second.RemoveContained(p->GetPortionId(), minMemoryRead);
++CountMemoryUsages[it->second.GetMinMemoryRead()];
if (it == itTo) {
break;
}
Expand All @@ -67,19 +70,27 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
if (itFrom != itTo) {
itFrom->second.RemoveStart(p);
if (itFrom->second.IsEmpty()) {
RemoveFromMemoryUsageControl(itFrom->second.GetMinMemoryRead());
Points.erase(itFrom);
}
itTo->second.RemoveFinish(p);
if (itTo->second.IsEmpty()) {
RemoveFromMemoryUsageControl(itTo->second.GetMinMemoryRead());
Points.erase(itTo);
}
} else {
itTo->second.RemoveStart(p);
itTo->second.RemoveFinish(p);
if (itTo->second.IsEmpty()) {
RemoveFromMemoryUsageControl(itTo->second.GetMinMemoryRead());
Points.erase(itTo);
}
}
if (CountMemoryUsages.size()) {
Counters.MinReadBytes->SetValue(CountMemoryUsages.rbegin()->first);
} else {
Counters.MinReadBytes->SetValue(0);
}
}

void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
Expand All @@ -89,13 +100,21 @@ void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
itTo->second.AddFinish(p);

auto it = itFrom;
const ui64 minMemoryRead = p->GetMinMemoryForReadColumns({});
while (true) {
it->second.AddContained(p->GetPortionId());
RemoveFromMemoryUsageControl(it->second.GetMinMemoryRead());
it->second.AddContained(p->GetPortionId(), minMemoryRead);
++CountMemoryUsages[it->second.GetMinMemoryRead()];
if (it == itTo) {
break;
}
AFL_VERIFY(++it != Points.end());
}
if (CountMemoryUsages.size()) {
Counters.MinReadBytes->SetValue(CountMemoryUsages.rbegin()->first);
} else {
Counters.MinReadBytes->SetValue(0);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>

namespace NKikimr::NOlap {
Expand All @@ -11,35 +12,44 @@ class TPortionsPKPoint {
private:
THashMap<ui64, std::shared_ptr<TPortionInfo>> Start;
THashMap<ui64, std::shared_ptr<TPortionInfo>> Finish;
THashSet<ui64> PortionIds;
THashMap<ui64, ui64> PortionIds;
YDB_READONLY(ui64, MinMemoryRead, 0);

public:
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetStart() const {
return Start;
}

void ProvidePortions(const TPortionsPKPoint& source) {
for (auto&& i : source.PortionIds) {
MinMemoryRead = 0;
for (auto&& [i, mem] : source.PortionIds) {
if (source.Finish.contains(i)) {
continue;
}
AFL_VERIFY(PortionIds.emplace(i).second);
AddContained(i, mem);
}
}

const THashSet<ui64>& GetPortionIds() const {
const THashMap<ui64, ui64>& GetPortionIds() const {
return PortionIds;
}

bool IsEmpty() const {
return Start.empty() && Finish.empty();
}

void AddContained(const ui64 portionId) {
AFL_VERIFY(PortionIds.emplace(portionId).second);
void AddContained(const ui32 portionId, const ui64 minMemoryRead) {
MinMemoryRead += minMemoryRead;
AFL_VERIFY(PortionIds.emplace(portionId, minMemoryRead).second);
}

void RemoveContained(const ui64 portionId) {
void RemoveContained(const ui32 portionId, const ui64 minMemoryRead) {
AFL_VERIFY(minMemoryRead <= MinMemoryRead);
MinMemoryRead -= minMemoryRead;
AFL_VERIFY(PortionIds.erase(portionId));
if (PortionIds.empty()) {
AFL_VERIFY(!MinMemoryRead);
}
}

void RemoveStart(const std::shared_ptr<TPortionInfo>& p) {
Expand All @@ -64,7 +74,9 @@ class TPortionsPKPoint {
class TPortionsIndex {
private:
std::map<NArrow::TReplaceKey, TPortionsPKPoint> Points;
std::map<ui64, i32> CountMemoryUsages;
const TGranuleMeta& Owner;
const NColumnShard::TPortionsIndexCounters& Counters;

std::map<NArrow::TReplaceKey, TPortionsPKPoint>::iterator InsertPoint(const NArrow::TReplaceKey& key) {
auto it = Points.find(key);
Expand All @@ -75,17 +87,35 @@ class TPortionsIndex {
--itPred;
it->second.ProvidePortions(itPred->second);
}
++CountMemoryUsages[it->second.GetMinMemoryRead()];
}
return it;
}

void RemoveFromMemoryUsageControl(const ui64 mem) {
auto it = CountMemoryUsages.find(mem);
AFL_VERIFY(it != CountMemoryUsages.end())("mem", mem);
if (!--it->second) {
CountMemoryUsages.erase(it);
}
}

public:
TPortionsIndex(const TGranuleMeta& owner)
TPortionsIndex(const TGranuleMeta& owner, const NColumnShard::TPortionsIndexCounters& counters)
: Owner(owner)
, Counters(counters)
{

}

ui64 GetMinMemoryRead() const {
if (CountMemoryUsages.empty()) {
return 0;
} else {
return CountMemoryUsages.rbegin()->second;
}
}

const std::map<NArrow::TReplaceKey, TPortionsPKPoint>& GetPoints() const {
return Points;
}
Expand Down
Loading