Skip to content

Fix portions cleaning #9347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID());
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(
TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot());
TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot(), Counters.GetCSCounters().WritingCounters);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
}
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/counters/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NKikimr::NColumnShard {

TCSCounters::TCSCounters()
: TBase("CS")
, WritingCounters(std::make_shared<TWriteCounters>(*this))
, Initialization(*this)
, TxProgress(*this) {
StartBackgroundCount = TBase::GetDeriviative("StartBackground/Count");
Expand Down
31 changes: 25 additions & 6 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,29 @@ enum class EWriteFailReason {
OverlimitReadBlobMemory /* "overlimit_read_blob_memory" */
};

class TWriteCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr VolumeWriteData;
NMonitoring::THistogramPtr HistogramBytesWriteDataCount;
NMonitoring::THistogramPtr HistogramBytesWriteDataBytes;

public:
TWriteCounters(TCommonCountersOwner& owner)
: TBase(owner, "activity", "writing")
{
VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes");
HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100));
}

void OnIncomingData(const ui64 dataSize) const {
VolumeWriteData->Add(dataSize);
HistogramBytesWriteDataCount->Collect((i64)dataSize, 1);
HistogramBytesWriteDataBytes->Collect((i64)dataSize, dataSize);
}
};

class TCSCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
Expand Down Expand Up @@ -72,7 +95,9 @@ class TCSCounters: public TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests;
NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;

public:
const std::shared_ptr<TWriteCounters> WritingCounters;
const TCSInitialization Initialization;
TTxProgressCounters TxProgress;

Expand All @@ -89,7 +114,6 @@ class TCSCounters: public TCommonCountersOwner {

void OnWritePutBlobsSuccess(const TDuration d) const {
HistogramSuccessWritePutBlobsDurationMs->Collect(d.MilliSeconds());
WritePutBlobsCount->Sub(1);
}

void OnWriteMiddle1PutBlobsSuccess(const TDuration d) const {
Expand Down Expand Up @@ -118,11 +142,6 @@ class TCSCounters: public TCommonCountersOwner {

void OnWritePutBlobsFail(const TDuration d) const {
HistogramFailedWritePutBlobsDurationMs->Collect(d.MilliSeconds());
WritePutBlobsCount->Sub(1);
}

void OnWritePutBlobsStart() const {
WritePutBlobsCount->Add(1);
}

void OnWriteTxComplete(const TDuration d) const {
Expand Down
23 changes: 10 additions & 13 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,16 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
ui32 skipLocked = 0;
ui32 portionsFromDrop = 0;
bool limitExceeded = false;
THashSet<TPortionAddress> uniquePortions;
for (ui64 pathId : pathsToDrop) {
auto g = GranulesStorage->GetGranuleOptional(pathId);
if (!g) {
continue;
}

for (auto& [portion, info] : g->GetPortions()) {
if (info->CheckForCleanup()) {
continue;
}
if (dataLocksManager->IsLocked(*info)) {
++skipLocked;
continue;
Expand All @@ -361,8 +363,6 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
limitExceeded = true;
break;
}
const auto inserted = uniquePortions.emplace(info->GetAddress()).second;
Y_ABORT_UNLESS(inserted);
changes->PortionsToDrop.push_back(*info);
++portionsFromDrop;
}
Expand All @@ -381,17 +381,14 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
++i;
continue;
}
const auto inserted = uniquePortions.emplace(it->second[i].GetAddress()).second;
if (inserted) {
AFL_VERIFY(it->second[i].CheckForCleanup(snapshot))("p_snapshot", it->second[i].GetRemoveSnapshotOptional())("snapshot", snapshot);
if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
txSize += it->second[i].GetTxVolume();
} else {
limitExceeded = true;
break;
}
changes->PortionsToDrop.push_back(std::move(it->second[i]));
AFL_VERIFY(it->second[i].CheckForCleanup(snapshot))("p_snapshot", it->second[i].GetRemoveSnapshotOptional())("snapshot", snapshot);
if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
txSize += it->second[i].GetTxVolume();
} else {
limitExceeded = true;
break;
}
changes->PortionsToDrop.push_back(std::move(it->second[i]));
if (i + 1 < it->second.size()) {
it->second[i] = std::move(it->second.back());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
}

std::vector<TCommittedBlob> result;
result.reserve(pInfo->GetCommitted().size() + pInfo->GetInserted().size());
result.reserve(pInfo->GetCommitted().size() + Summary.GetInserted().size());

for (const auto& data : pInfo->GetCommitted()) {
if (lockId || data.GetSnapshot() <= reqSnapshot) {
Expand All @@ -137,7 +137,10 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
}
}
if (lockId) {
for (const auto& [writeId, data] : pInfo->GetInserted()) {
for (const auto& [writeId, data] : Summary.GetInserted()) {
if (data.GetPathId() != pathId) {
continue;
}
auto start = data.GetMeta().GetFirstPK(pkSchema);
auto finish = data.GetMeta().GetLastPK(pkSchema);
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ namespace NKikimr::NOlap {
class TPKRangesFilter;
class IDbWrapper;

/// Use one table for inserted and committed blobs:
/// !Commited => {PlanStep, WriteTxId} are {0, WriteId}
/// Commited => {PlanStep, WriteTxId} are {PlanStep, TxId}

class TInsertTableAccessor {
protected:
TInsertionSummary Summary;
Expand Down Expand Up @@ -76,7 +72,7 @@ class TInsertTableAccessor {
const THashMap<TInsertWriteId, TInsertedData>& GetAborted() const {
return Summary.GetAborted();
}
const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const {
const TInsertedContainer& GetInserted() const {
return Summary.GetInserted();
}
const TInsertionSummary::TCounters& GetCountersPrepared() const {
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,4 @@ NKikimr::NOlap::TPathInfoIndexPriority TPathInfo::GetIndexationPriority() const
}
}

const THashMap<TInsertWriteId, TInsertedData>& TPathInfo::GetInserted() const {
return Summary->GetInserted();
}

}
2 changes: 0 additions & 2 deletions ydb/core/tx/columnshard/engines/insert_table/path_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ class TPathInfo: public TMoveOnly {
return Committed.empty() && !InsertedSize;
}

const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const;

void AddInsertedSize(const i64 size, const ui64 overloadLimit);

explicit TPathInfo(TInsertionSummary& summary, const ui64 pathId);
Expand Down
61 changes: 10 additions & 51 deletions ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,37 +89,8 @@ void TInsertionSummary::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize
AFL_VERIFY(Counters.Inserted.GetDataSize() == (i64)StatsPrepared.Bytes);
}

THashSet<TInsertWriteId> TInsertionSummary::GetInsertedByPathId(const ui64 pathId) const {
THashSet<TInsertWriteId> result;
for (auto& [writeId, data] : Inserted) {
if (data.GetPathId() == pathId) {
result.insert(writeId);
}
}

return result;
}

THashSet<TInsertWriteId> TInsertionSummary::GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const {
if (timeBorder < MinInsertedTs) {
return {};
}

THashSet<TInsertWriteId> toAbort;
TInstant newMin = TInstant::Max();
for (auto& [writeId, data] : Inserted) {
const TInstant dataInsertTs = data.GetMeta().GetDirtyWriteTime();
if (data.IsNotAbortable()) {
continue;
}
if (dataInsertTs < timeBorder && toAbort.size() < limit) {
toAbort.insert(writeId);
} else {
newMin = Min(newMin, dataInsertTs);
}
}
MinInsertedTs = (toAbort.size() == Inserted.size()) ? TInstant::Zero() : newMin;
return toAbort;
return Inserted.GetExpired(timeBorder, limit);
}

bool TInsertionSummary::EraseAborted(const TInsertWriteId writeId) {
Expand Down Expand Up @@ -173,33 +144,21 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData
}

std::optional<NKikimr::NOlap::TInsertedData> TInsertionSummary::ExtractInserted(const TInsertWriteId id) {
auto it = Inserted.find(id);
if (it == Inserted.end()) {
return {};
} else {
auto pathInfo = GetPathInfoOptional(it->second.GetPathId());
auto result = Inserted.ExtractOptional(id);
if (result) {
auto pathInfo = GetPathInfoOptional(result->GetPathId());
if (pathInfo) {
OnEraseInserted(*pathInfo, it->second.BlobSize());
OnEraseInserted(*pathInfo, result->BlobSize());
}
std::optional<TInsertedData> result = std::move(it->second);
Inserted.erase(it);
return result;
}
return result;
}

const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedData&& data, const bool load /*= false*/) {
const TInsertWriteId writeId = data.GetInsertWriteId();
const ui32 dataSize = data.BlobSize();
const ui64 pathId = data.GetPathId();
auto insertInfo = Inserted.emplace(writeId, std::move(data));
AFL_VERIFY_DEBUG(!Aborted.contains(writeId));
if (insertInfo.second) {
OnNewInserted(GetPathInfo(pathId), dataSize, load);
return &insertInfo.first->second;
} else {
Counters.Inserted.SkipAdd(dataSize);
return nullptr;
}
auto* insertInfo = Inserted.AddVerified(std::move(data));
AFL_VERIFY_DEBUG(!Aborted.contains(insertInfo->GetInsertWriteId()));
OnNewInserted(GetPathInfo(insertInfo->GetPathId()), insertInfo->BlobSize(), load);
return insertInfo;
}

}
Loading
Loading