Skip to content

Commit 899de9d

Browse files
committed
fix cs latency spikes
1 parent 5967a74 commit 899de9d

File tree

6 files changed

+74
-20
lines changed

6 files changed

+74
-20
lines changed

ydb/core/tx/columnshard/background_controller.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class TBackgroundController {
1717

1818
bool ActiveCleanupPortions = false;
1919
bool ActiveCleanupTables = false;
20+
bool ActiveCleanupInsertTable = false;
2021
YDB_READONLY(TMonotonic, LastIndexationInstant, TMonotonic::Zero());
2122
public:
2223
THashSet<NOlap::TPortionAddress> GetConflictTTLPortions() const;
@@ -66,6 +67,18 @@ class TBackgroundController {
6667
bool IsCleanupTablesActive() const {
6768
return ActiveCleanupTables;
6869
}
70+
71+
void StartCleanupInsertTable() {
72+
Y_ABORT_UNLESS(!ActiveCleanupInsertTable);
73+
ActiveCleanupInsertTable = true;
74+
}
75+
void FinishCleanupInsertTable() {
76+
Y_ABORT_UNLESS(ActiveCleanupInsertTable);
77+
ActiveCleanupInsertTable = false;
78+
}
79+
bool IsCleanupInsertTableActive() const {
80+
return ActiveCleanupInsertTable;
81+
}
6982
};
7083

7184
}

ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,18 @@ class TTabletsByBlob {
8989
bool IsEmpty() const {
9090
return Data.empty();
9191
}
92-
92+
using TGenStep = std::tuple<ui32, ui32>;
93+
std::deque<TUnifiedBlobId> GroupByGenStep() const {
94+
std::deque<TUnifiedBlobId> result;
95+
for (const auto& i : Data) {
96+
result.emplace_back(i.first);
97+
}
98+
const auto pred = [](const TUnifiedBlobId& l, const TUnifiedBlobId& r) {
99+
return TGenStep(l.GetLogoBlobId().Generation(), l.GetLogoBlobId().Step()) < TGenStep(r.GetLogoBlobId().Generation(), r.GetLogoBlobId().Step());
100+
};
101+
std::sort(result.begin(), result.end(), pred);
102+
return result;
103+
}
93104
template <class TFilter>
94105
TTabletsByBlob ExtractBlobs(const TFilter& filter, const std::optional<ui32> countLimit = {}) {
95106
TTabletsByBlob result;

ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ std::vector<TGenStep> TBlobManager::FindNewGCBarriers() {
211211
return result;
212212
}
213213

214-
std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTask(const TString& storageId,
214+
std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTask(const TString& storageId,
215215
const std::shared_ptr<TBlobManager>& manager, const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& sharedBlobsInfo,
216216
const std::shared_ptr<NBlobOperations::TRemoveGCCounters>& counters) noexcept {
217217
AFL_VERIFY(!CollectGenStepInFlight);
@@ -244,7 +244,7 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
244244
}
245245
}
246246

247-
static const ui32 blobsGCCountLimit = 50000;
247+
static const ui32 blobsGCCountLimit = 500000;
248248

249249
const auto predShared = [&](const TUnifiedBlobId& id, const THashSet<TTabletId>& /*tabletIds*/) {
250250
return id.GetLogoBlobId().TabletID() != (ui64)SelfTabletId;
@@ -262,9 +262,8 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
262262

263263
TTabletsByBlob extractedOld = BlobsToDelete.ExtractBlobs(predRemoveOld, blobsGCCountLimit - extractedToRemoveFromDB.GetSize());
264264
extractedToRemoveFromDB.Add(extractedOld);
265-
TTabletId tabletId;
266-
TUnifiedBlobId unifiedBlobId;
267-
while (extractedOld.ExtractFront(tabletId, unifiedBlobId)) {
265+
for (TTabletsByBlob::TIterator itExtractedOld(extractedOld); itExtractedOld.IsValid(); ++itExtractedOld) {
266+
const TUnifiedBlobId unifiedBlobId = itExtractedOld.GetBlobId();
268267
auto logoBlobId = unifiedBlobId.GetLogoBlobId();
269268
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId);
270269
NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()];
@@ -278,6 +277,10 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
278277

279278

280279
std::deque<TUnifiedBlobId> keepsToErase;
280+
std::deque<TUnifiedBlobId> deleteIndex;
281+
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() < blobsGCCountLimit) {
282+
deleteIndex = BlobsToDelete.GroupByGenStep();
283+
}
281284
for (auto&& newCollectGenStep : newCollectGenSteps) {
282285
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "PreparePerGroupGCRequests")("gen", std::get<0>(newCollectGenStep))("step", std::get<1>(newCollectGenStep));
283286
BlobsManagerCounters.OnNewCollectStep(std::get<0>(newCollectGenStep), std::get<1>(newCollectGenStep));
@@ -297,20 +300,38 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
297300
perGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt);
298301
keepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *keepBlobIt));
299302
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc", *keepBlobIt);
303+
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
304+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
305+
break;
306+
}
307+
}
308+
if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
309+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
310+
break;
300311
}
301312
BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt);
302313
BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);
303314

304-
const auto predSelf = [&](const TUnifiedBlobId& id, const THashSet<TTabletId>& /*tabletIds*/) {
305-
auto logoBlobId = id.GetLogoBlobId();
306-
TGenStep genStep{logoBlobId.Generation(), logoBlobId.Step()};
307-
return genStep <= newCollectGenStep && id.GetLogoBlobId().TabletID() == (ui64)SelfTabletId;
308-
};
309-
TTabletsByBlob extractedSelf = BlobsToDelete.ExtractBlobs(predSelf);
315+
TTabletsByBlob extractedSelf;
316+
{
317+
while (deleteIndex.size()) {
318+
const auto& blobId = deleteIndex.front().GetLogoBlobId();
319+
if (newCollectGenStep < TGenStep(blobId.Generation(), blobId.Step())) {
320+
break;
321+
}
322+
BlobsToDelete.ExtractBlobTo(deleteIndex.front(), extractedSelf);
323+
if (extractedToRemoveFromDB.GetSize() + extractedSelf.GetSize() + keepsToErase.size() > blobsGCCountLimit) {
324+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit);
325+
break;
326+
}
327+
deleteIndex.pop_front();
328+
}
329+
330+
}
331+
310332
extractedToRemoveFromDB.Add(extractedSelf);
311-
TTabletId tabletId;
312-
TUnifiedBlobId unifiedBlobId;
313-
while (extractedSelf.ExtractFront(tabletId, unifiedBlobId)) {
333+
for (TTabletsByBlob::TIterator itExtractedSelf(extractedSelf); itExtractedSelf.IsValid(); ++itExtractedSelf) {
334+
const TUnifiedBlobId unifiedBlobId = itExtractedSelf.GetBlobId();
314335
auto logoBlobId = unifiedBlobId.GetLogoBlobId();
315336
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId);
316337
NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()];
@@ -359,6 +380,8 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
359380
return result;
360381
}
361382

383+
384+
362385
TBlobBatch TBlobManager::StartBlobBatch(ui32 channel) {
363386
++CountersUpdate.BatchesStarted;
364387
Y_ABORT_UNLESS(channel == BLOB_CHANNEL, "Support for mutiple blob channels is not implemented yet");

ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ void TTxInsertTableCleanup::Complete(const TActorContext& /*ctx*/) {
3030

3131
Y_ABORT_UNLESS(BlobsAction);
3232
BlobsAction->OnCompleteTxAfterRemoving(true);
33-
Self->EnqueueBackgroundActivities();
33+
Self->BackgroundController.FinishCleanupInsertTable();
34+
Self->SetupCleanupInsertTable();
3435
}
3536

3637
}

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,13 +845,18 @@ void TColumnShard::Handle(TEvPrivate::TEvGarbageCollectionFinished::TPtr& ev, co
845845
}
846846

847847
void TColumnShard::SetupCleanupInsertTable() {
848+
if (BackgroundController.IsCleanupInsertTableActive()) {
849+
ACFL_DEBUG("background", "cleanup_insert_table")("skip_reason", "in_progress");
850+
return;
851+
}
848852
auto writeIdsToCleanup = InsertTable->OldWritesToAbort(AppData()->TimeProvider->Now());
849853

850854
if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) {
851855
return;
852856
}
853857
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size());
854858

859+
BackgroundController.StartCleanupInsertTable();
855860
Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext());
856861
}
857862

ydb/core/tx/columnshard/engines/column_engine.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@ const std::shared_ptr<arrow::Schema>& IColumnEngine::GetReplaceKey() const {
99
}
1010

1111
ui64 IColumnEngine::GetMetadataLimit() {
12+
static const auto MemoryTotal = NSystemInfo::TotalMemorySize();
1213
if (!HasAppData()) {
13-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", NSystemInfo::TotalMemorySize());
14-
return NSystemInfo::TotalMemorySize() * 0.3;
14+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", MemoryTotal);
15+
return MemoryTotal * 0.3;
1516
} else if (AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().HasAbsoluteValue()) {
1617
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("value", AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetAbsoluteValue());
1718
return AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetAbsoluteValue();
1819
} else {
19-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", NSystemInfo::TotalMemorySize())("kff", AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio());
20-
return NSystemInfo::TotalMemorySize() * AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio();
20+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", MemoryTotal)("kff", AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio());
21+
return MemoryTotal * AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio();
2122
}
2223
}
2324

0 commit comments

Comments
 (0)