Skip to content

Reuse S3 locators to prevent false trash accumulation #15390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/blob_depot/agent/agent_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ namespace NKikimr::NBlobDepot {
TActorId PipeId;
TActorId PipeServerId;
bool IsConnected = false;
ui64 ConnectionInstance = 0;

NMonitoring::TDynamicCounterPtr AgentCounters;

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blob_depot/agent/comm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ namespace NKikimr::NBlobDepot {
}

void TBlobDepotAgent::OnDisconnect() {
++ConnectionInstance;

while (!TabletRequestInFlight.empty()) {
auto node = TabletRequestInFlight.extract(TabletRequestInFlight.begin());
auto& requestInFlight = node.value();
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/blob_depot/agent/storage_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace NKikimr::NBlobDepot {
TBlobSeqId BlobSeqId;
std::optional<TS3Locator> LocatorInFlight;
TActorId WriterActorId;
ui64 ConnectionInstanceOnStart;

struct TLifetimeToken {};
std::shared_ptr<TLifetimeToken> LifetimeToken;
Expand Down Expand Up @@ -410,6 +411,8 @@ namespace NKikimr::NBlobDepot {
.AddMetadata("key", Request.Id.ToString()),
Request.Buffer.ExtractUnderlyingContainerOrCopy<TString>()),
IEventHandle::FlagTrackDelivery));

ConnectionInstanceOnStart = Agent.ConnectionInstance;
}

void OnPutS3ObjectResponse(std::optional<TString>&& error) {
Expand All @@ -418,6 +421,11 @@ namespace NKikimr::NBlobDepot {

WriterActorId = {};

if (ConnectionInstanceOnStart != Agent.ConnectionInstance) {
error = "BlobDepot tablet disconnected";
LocatorInFlight.reset(); // prevent discarding this locator
}

if (error) {
++*Agent.S3PutsError;

Expand Down
18 changes: 6 additions & 12 deletions ydb/core/blob_depot/op_commit_blob_seq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,8 @@ namespace NKikimr::NBlobDepot {
if (item.HasS3Locator()) {
const auto& locator = TS3Locator::FromProto(item.GetS3Locator());
const size_t numErased = agent.S3WritesInFlight.erase(locator);
if (locator.Generation < generation) {
Y_ABORT_UNLESS(!numErased);
Self->Data->AddToS3Trash(locator, txc, this);
} else {
Y_ABORT_UNLESS(numErased == 1);
Self->S3Manager->AddTrashToCollect(locator);
}
Y_ABORT_UNLESS(numErased);
Self->S3Manager->AddTrashToCollect(locator);
}
};

Expand Down Expand Up @@ -244,11 +239,10 @@ namespace NKikimr::NBlobDepot {
}

for (const auto& item : record.GetS3Locators()) {
if (const auto& locator = TS3Locator::FromProto(item); locator.Generation == generation) {
const size_t numErased = agent.S3WritesInFlight.erase(locator);
Y_ABORT_UNLESS(numErased == 1);
S3Manager->AddTrashToCollect(locator);
}
const auto& locator = TS3Locator::FromProto(item);
const size_t numErased = agent.S3WritesInFlight.erase(locator);
Y_ABORT_UNLESS(numErased == 1);
S3Manager->AddTrashToCollect(locator);
}
}

Expand Down
10 changes: 9 additions & 1 deletion ydb/core/blob_depot/s3_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace NKikimr::NBlobDepot {
locator.ToProto(responseItem->MutableS3Locator());

// we put it here until operation is completed; if tablet restarts and operation fails, then this
// key will be deleted
// key will be deleted; we also rewrite spoiled locator because Len is different
db.Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Update<Schema::TrashS3::Len>(locator.Len);

const bool inserted = agent.S3WritesInFlight.insert(locator).second;
Expand Down Expand Up @@ -98,6 +98,14 @@ namespace NKikimr::NBlobDepot {
};

TS3Locator TS3Manager::AllocateS3Locator(ui32 len) {
if (!DeleteQueue.empty()) {
TS3Locator res = DeleteQueue.front();
DeleteQueue.pop_front();
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] = --TotalS3TrashObjects;
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] = TotalS3TrashSize -= res.Len;
res.Len = len;
return res;
}
return {
.Len = len,
.Generation = Self->Executor()->Generation(),
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blob_depot/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ namespace NKikimr::NBlobDepot {
}

TString MakeObjectName(const TString& basePath) const {
const size_t hash = THash()(*this);
const size_t hash = MultiHash(Generation, KeyId);
const size_t a = hash % 36;
const size_t b = hash / 36 % 36;
static const char vec[] = "0123456789abcdefghijklmnopqrstuvwxyz";
Expand Down
Loading