Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions ydb/core/tx/columnshard/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,15 @@ void TBlobManager::DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) {
blobBatch.BatchInfo->GenStepRef.Reset();
}

void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) {
++CountersUpdate.BlobsDeleted;

void TBlobManager::DeleteBlobOnExecute(const TUnifiedBlobId& blobId, IBlobManagerDb& db) {
// Persist deletion intent
db.AddBlobToDelete(blobId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete", blobId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_on_execute", blobId);
}

void TBlobManager::DeleteBlobOnComplete(const TUnifiedBlobId& blobId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_on_complete", blobId);
++CountersUpdate.BlobsDeleted;

// Check if the deletion needs to be delayed until the blob is no longer
// used by in-flight requests
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/blob_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class IBlobManager {
}

// Deletes the blob that was previously permanently saved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that comment relates to old DeleteBlob and we can write new comments for DeleteBlobOnExecute and DeleteBlobOnComplete (prefer) or remove old comment.

virtual void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) = 0;
virtual void DeleteBlobOnExecute(const TUnifiedBlobId& blobId, IBlobManagerDb& db) = 0;
virtual void DeleteBlobOnComplete(const TUnifiedBlobId& blobId) = 0;
};

// An interface for exporting and caching exported blobs out of ColumnShard index to external storages like S3.
Expand Down Expand Up @@ -218,7 +219,8 @@ class TBlobManager : public IBlobManager, public NOlap::TCommonBlobsTracker {

// Implementation of IBlobManager interface
TBlobBatch StartBlobBatch(ui32 channel = BLOB_CHANNEL) override;
void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) override;
void DeleteBlobOnExecute(const TUnifiedBlobId& blobId, IBlobManagerDb& db) override;
void DeleteBlobOnComplete(const TUnifiedBlobId& blobId) override;
private:
TGenStep FindNewGCBarrier();

Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/blobs_action/abstract/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ class TStorageAction {
}
}

void OnCompleteTxAfterAction(NColumnShard::TColumnShard& self) {
void OnCompleteTxAfterAction(NColumnShard::TColumnShard& self, const bool success) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
I see that Ev->Get()->GetPutStatus() is passing result to success.
I suggest considering the possibility of naming the variable not success but specifying the success of which operation is expected here.

if (Removing) {
Removing->OnCompleteTxAfterRemoving(self);
Removing->OnCompleteTxAfterRemoving(self, success);
}
if (Writing) {
Writing->OnCompleteTxAfterWrite(self);
Writing->OnCompleteTxAfterWrite(self, success);
}
}
};
Expand Down Expand Up @@ -150,9 +150,9 @@ class TBlobsAction {
}
}

void OnCompleteTxAfterAction(NColumnShard::TColumnShard& self) {
void OnCompleteTxAfterAction(NColumnShard::TColumnShard& self, const bool success) {
for (auto&& i : StorageActions) {
i.second.OnCompleteTxAfterAction(self);
i.second.OnCompleteTxAfterAction(self, success);
}
}

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/abstract/remove.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class IBlobsDeclareRemovingAction: public ICommonBlobsAction {
protected:
virtual void DoDeclareRemove(const TUnifiedBlobId& blobId) = 0;
virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) = 0;
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& self) = 0;
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& self, const bool success) = 0;
public:
IBlobsDeclareRemovingAction(const TString& storageId)
: TBase(storageId)
Expand All @@ -36,8 +36,8 @@ class IBlobsDeclareRemovingAction: public ICommonBlobsAction {
void OnExecuteTxAfterRemoving(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
return DoOnExecuteTxAfterRemoving(self, dbBlobs, success);
}
void OnCompleteTxAfterRemoving(NColumnShard::TColumnShard& self) {
return DoOnCompleteTxAfterRemoving(self);
void OnCompleteTxAfterRemoving(NColumnShard::TColumnShard& self, const bool success) {
return DoOnCompleteTxAfterRemoving(self, success);
}
};

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/abstract/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class IBlobsWritingAction: public ICommonBlobsAction {
virtual void DoOnBlobWriteResult(const TUnifiedBlobId& blobId, const NKikimrProto::EReplyStatus status) = 0;

virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) = 0;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& self) = 0;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& self, const bool success) = 0;

virtual TUnifiedBlobId AllocateNextBlobId(const TString& data) = 0;
public:
Expand Down Expand Up @@ -79,8 +79,8 @@ class IBlobsWritingAction: public ICommonBlobsAction {
return DoOnExecuteTxAfterWrite(self, dbBlobs, success);
}

void OnCompleteTxAfterWrite(NColumnShard::TColumnShard& self) {
return DoOnCompleteTxAfterWrite(self);
void OnCompleteTxAfterWrite(NColumnShard::TColumnShard& self, const bool success) {
return DoOnCompleteTxAfterWrite(self, success);
}

void SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId);
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/bs/remove.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ class TDeclareRemovingAction: public IBlobsDeclareRemovingAction {
virtual void DoOnExecuteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) {
if (success) {
for (auto&& i : GetDeclaredBlobs()) {
Manager->DeleteBlob(i, dbBlobs);
Manager->DeleteBlobOnExecute(i, dbBlobs);
}
}
}
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/) {

virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, const bool success) {
if (success) {
for (auto&& i : GetDeclaredBlobs()) {
Manager->DeleteBlobOnComplete(i);
}
}
}
public:
TDeclareRemovingAction(const TString& storageId, NColumnShard::TBlobManager& manager)
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/blobs_action/bs/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TWriteAction: public IBlobsWritingAction {
}

virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) override;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/) override {
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*success*/) override {

}
public:
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class TMemoryWriteAction: public IBlobsWritingAction {
virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& /*dbBlobs*/, const bool /*success*/) override {

}
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/) override {
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*success*/) override {

}
public:
Expand Down Expand Up @@ -106,7 +106,7 @@ class TMemoryDeclareRemovingAction: public IBlobsDeclareRemovingAction {
Storage->DeclareDataForRemove(i);
}
}
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/) {
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, const bool /*success*/) {

}
public:
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/tier/remove.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ class TDeclareRemovingAction: public IBlobsDeclareRemovingAction {
if (success) {
for (auto&& i : GetDeclaredBlobs()) {
dbBlobs.AddTierBlobToDelete(GetStorageId(), i);
}
}
}
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/, const bool success) {
if (success) {
for (auto&& i : GetDeclaredBlobs()) {
if (GCInfo->IsBlobInUsage(i)) {
Y_ABORT_UNLESS(GCInfo->MutableBlobsToDeleteInFuture().emplace(i).second);
} else {
Expand All @@ -28,9 +34,6 @@ class TDeclareRemovingAction: public IBlobsDeclareRemovingAction {
}
}
}
virtual void DoOnCompleteTxAfterRemoving(NColumnShard::TColumnShard& /*self*/) {

}
public:
TDeclareRemovingAction(const TString& storageId, const std::shared_ptr<TGCInfo>& gcInfo)
: TBase(storageId)
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/blobs_action/tier/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class TWriteAction: public IBlobsWritingAction {
}

virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool success) override;
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/) override {
virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*success*/) override {

}
public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex
}
void TTxInsertTableCleanup::Complete(const TActorContext& /*ctx*/) {
Y_ABORT_UNLESS(BlobsAction);
BlobsAction->OnCompleteTxAfterRemoving(*Self);
BlobsAction->OnCompleteTxAfterRemoving(*Self, true);
Self->EnqueueBackgroundActivities();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ void TTxWrite::Complete(const TActorContext& ctx) {
const auto now = TMonotonic::Now();
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& i : buffer.GetAddActions()) {
i->OnCompleteTxAfterWrite(*Self);
i->OnCompleteTxAfterWrite(*Self, true);
}
for (auto&& i : buffer.GetRemoveActions()) {
i->OnCompleteTxAfterRemoving(*Self);
i->OnCompleteTxAfterRemoving(*Self, true);
}
AFL_VERIFY(buffer.GetAggregations().size() == Results.size());
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
Self->EnqueueBackgroundActivities(false, TriggerActivity);
}

changes->MutableBlobsAction().OnCompleteTxAfterAction(*Self);
changes->MutableBlobsAction().OnCompleteTxAfterAction(*Self, Ev->Get()->GetPutStatus() == NKikimrProto::OK);
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexComplete(Self->TabletID(), changes->TypeString());
}

Expand Down