99
1010#include < ydb/library/actors/core/actor.h>
1111
12+ #include < util/string/vector.h>
13+
1214namespace NKikimr ::NOlap {
1315
1416class TLeakedBlobsNormalizerChanges : public INormalizerChanges {
1517private:
1618 THashSet<TLogoBlobID> Leaks;
1719 const ui64 TabletId;
1820 NColumnShard::TBlobGroupSelector DsGroupSelector;
21+ ui64 LeakeadBlobsSize;
1922
2023public:
2124 TLeakedBlobsNormalizerChanges (THashSet<TLogoBlobID>&& leaks, const ui64 tabletId, NColumnShard::TBlobGroupSelector dsGroupSelector)
2225 : Leaks(std::move(leaks))
2326 , TabletId(tabletId)
2427 , DsGroupSelector(dsGroupSelector) {
28+ LeakeadBlobsSize = 0 ;
29+ for (const auto & blob : Leaks) {
30+ LeakeadBlobsSize += blob.BlobSize ();
31+ }
2532 }
2633
2734 bool ApplyOnExecute (NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController*/ ) const override {
@@ -42,6 +49,18 @@ class TLeakedBlobsNormalizerChanges: public INormalizerChanges {
4249 ui64 GetSize () const override {
4350 return Leaks.size ();
4451 }
52+
53+ TString DebugString () const override {
54+ TStringBuilder sb;
55+ sb << " tablet=" << TabletId;
56+ sb << " ;leaked_blob_count=" << Leaks.size ();
57+ sb << " ;leaked_blobs_size=" << LeakeadBlobsSize;
58+ auto blobSampleEnd = Leaks.begin ();
59+ for (ui64 i = 0 ; i < 10 && blobSampleEnd != Leaks.end (); ++i, ++blobSampleEnd) {
60+ }
61+ sb << " ;leaked_blobs=[" << JoinStrings (Leaks.begin (), blobSampleEnd, " ," ) << " ]" ;
62+ return sb;
63+ }
4564};
4665
4766class TRemoveLeakedBlobsActor : public TActorBootstrapped <TRemoveLeakedBlobsActor> {
@@ -156,7 +175,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TLeakedBlobsNormalizer::DoInit(
156175 NIceDb::TNiceDb db (txc.DB );
157176 const bool ready = (int )Schema::Precharge<Schema::IndexPortions>(db, txc.DB .GetScheme ()) &
158177 (int )Schema::Precharge<Schema::IndexColumns>(db, txc.DB .GetScheme ()) &
159- (int )Schema::Precharge<Schema::IndexIndexes>(db, txc.DB .GetScheme ());
178+ (int )Schema::Precharge<Schema::IndexIndexes>(db, txc.DB .GetScheme ()) &
179+ (int )Schema::Precharge<Schema::BlobsToDeleteWT>(db, txc.DB .GetScheme ());
160180 if (!ready) {
161181 return TConclusionStatus::Fail (" Not ready" );
162182 }
@@ -219,6 +239,24 @@ TConclusionStatus TLeakedBlobsNormalizer::LoadPortionBlobIds(
219239 }
220240 Indexes = std::move (indexesLocal);
221241 }
242+ if (BlobsToDelete.empty ()) {
243+ THashSet<TUnifiedBlobId> blobsToDelete;
244+ auto rowset = db.Table <NColumnShard::Schema::BlobsToDeleteWT>().Select ();
245+ if (!rowset.IsReady ()) {
246+ return TConclusionStatus::Fail (" Not ready: BlobsToDeleteWT" );
247+ }
248+ while (!rowset.EndOfSet ()) {
249+ const TString& blobIdStr = rowset.GetValue <NColumnShard::Schema::BlobsToDeleteWT::BlobId>();
250+ TString error;
251+ TUnifiedBlobId blobId = TUnifiedBlobId::ParseFromString (blobIdStr, &DsGroupSelector, error);
252+ AFL_VERIFY (blobId.IsValid ())(" event" , " cannot_parse_blob" )(" error" , error)(" original_string" , blobIdStr);
253+ blobsToDelete.emplace (blobId);
254+ if (!rowset.Next ()) {
255+ return TConclusionStatus::Fail (" Local table is not loaded: BlobsToDeleteWT" );
256+ }
257+ }
258+ BlobsToDelete = std::move (blobsToDelete);
259+ }
222260 AFL_VERIFY (Portions.size () == Records.size ())(" portions" , Portions.size ())(" records" , Records.size ());
223261 THashSet<TLogoBlobID> resultLocal;
224262 for (auto && i : Portions) {
@@ -240,6 +278,9 @@ TConclusionStatus TLeakedBlobsNormalizer::LoadPortionBlobIds(
240278 for (auto && c : it->second ) {
241279 resultLocal.emplace (c.GetLogoBlobId ());
242280 }
281+ for (const auto & c : BlobsToDelete) {
282+ resultLocal.emplace (c.GetLogoBlobId ());
283+ }
243284 }
244285 std::swap (resultLocal, result);
245286 return TConclusionStatus::Success ();
0 commit comments