Skip to content

Add patching to keyvalue #1549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,10 @@ struct TEvBlobStorage {
}
}

static ui8 BlobPlacementKind(const TLogoBlobID &blob) {
return blob.Hash() % BaseDomainsCount;
}

static bool GetBlobIdWithSamePlacement(const TLogoBlobID &originalId, TLogoBlobID *patchedId,
ui32 bitsForBruteForce, ui32 originalGroupId, ui32 currentGroupId)
{
Expand Down Expand Up @@ -2407,4 +2411,19 @@ inline bool SendPutToGroup(const TActorContext &ctx, ui32 groupId, TTabletStorag
// TODO(alexvru): check if return status is actually needed?
}

inline bool SendPatchToGroup(const TActorContext &ctx, ui32 groupId, TTabletStorageInfo *storage,
THolder<TEvBlobStorage::TEvPatch> event, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
auto checkGroupId = [&] {
const TLogoBlobID &id = event->PatchedId;
const ui32 expectedGroupId = storage->GroupFor(id.Channel(), id.Generation());
const TLogoBlobID &originalId = event->OriginalId;
const ui32 expectedOriginalGroupId = storage->GroupFor(originalId.Channel(), originalId.Generation());
return id.TabletID() == storage->TabletID && expectedGroupId != Max<ui32>() && groupId == expectedGroupId && event->OriginalGroupId == expectedOriginalGroupId;
};
Y_VERIFY_S(checkGroupId(), "groupIds# (" << event->OriginalGroupId << ',' << groupId << ") does not match actual ones LogoBlobIds# (" <<
event->OriginalId.ToString() << ',' << event->PatchedId.ToString() << ')');
return SendToBSProxy(ctx, groupId, event.Release(), cookie, std::move(traceId));
// TODO(alexvru): check if return status is actually needed?
}

} // NKikimr
6 changes: 6 additions & 0 deletions ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ namespace NKikimr {
"not implemented")), 0, ev->Cookie);
}

void Handle(TEvBlobStorage::TEvPatch::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM10, "TEvPatch", (Msg, ev->Get()->ToString()));
Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
}

template<typename TOut, typename TIn>
TOut *CopyExecutionRelay(TIn *in, TOut *out) {
out->ExecutionRelay = std::move(in->ExecutionRelay);
Expand All @@ -80,6 +85,7 @@ namespace NKikimr {
hFunc(TEvBlobStorage::TEvRange, Handle);
hFunc(TEvBlobStorage::TEvCollectGarbage, Handle);
hFunc(TEvBlobStorage::TEvStatus, Handle);
hFunc(TEvBlobStorage::TEvPatch, Handle);

hFunc(TEvents::TEvPoisonPill, HandlePoison);
hFunc(TEvBlobStorage::TEvConfigureProxy, Handle);
Expand Down
59 changes: 59 additions & 0 deletions ydb/core/blobstorage/dsproxy/mock/model.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <cstring>
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_space_color.h>

Expand Down Expand Up @@ -149,6 +150,64 @@ namespace NFake {
return result.release();
}

TEvBlobStorage::TEvPatchResult* Handle(TEvBlobStorage::TEvPatch *msg) {
// ensure we have full blob id, with PartId set to zero
const TLogoBlobID& id = msg->PatchedId;
Y_ABORT_UNLESS(id == id.FullID());

// validate put against set blocks
if (IsBlocked(id.TabletID(), id.Generation())) {
return new TEvBlobStorage::TEvPatchResult(NKikimrProto::BLOCKED, id, GetStorageStatusFlags(), GroupId, 0.f);
}

// check if this blob is not being collected -- writing such blob is a violation of BS contract
Y_ABORT_UNLESS(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data());


const TLogoBlobID& originalId = msg->OriginalId;
auto it = Blobs.find(originalId);
if (it == Blobs.end()) {
// ensure this blob is not under GC
Y_ABORT_UNLESS(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data());
return new TEvBlobStorage::TEvPatchResult(NKikimrProto::ERROR, id, GetStorageStatusFlags(), GroupId, 0.f);
}

auto& data = it->second;
// TODO(kruall): check bad diffs
TString buffer = TString::Uninitialized(data.Buffer.GetSize());
auto originalBuffer = data.Buffer.GetContiguousSpan();
memcpy(buffer.Detach(), originalBuffer.data(), buffer.size());
for (ui32 diffIdx = 0; diffIdx < msg->DiffCount; ++diffIdx) {
auto &diff = msg->Diffs[diffIdx];
auto diffBuffer = diff.Buffer.GetContiguousSpan();
memcpy(buffer.Detach() + diff.Offset, diffBuffer.data(), diffBuffer.size());
}


// validate that there are no blobs with the same gen/step, channel, cookie, but with different size
const TLogoBlobID base(id.TabletID(), id.Generation(), id.Step(), id.Channel(), 0, id.Cookie());
auto iter = Blobs.lower_bound(base);
if (iter != Blobs.end()) {
const TLogoBlobID& existing = iter->first;
Y_ABORT_UNLESS(
id.TabletID() != existing.TabletID() ||
id.Generation() != existing.Generation() ||
id.Step() != existing.Step() ||
id.Cookie() != existing.Cookie() ||
id.Channel() != existing.Channel() ||
id == existing,
"id# %s existing# %s", id.ToString().data(), existing.ToString().data());
if (id == existing) {
Y_ABORT_UNLESS(iter->second.Buffer == buffer);
}
}

// put an entry into logo blobs database and reply with success
Blobs.emplace(id, TRope(buffer));

return new TEvBlobStorage::TEvPatchResult(NKikimrProto::OK, id, GetStorageStatusFlags(), GroupId, 0.f);
}

TEvBlobStorage::TEvBlockResult* Handle(TEvBlobStorage::TEvBlock *msg) {
NKikimrProto::EReplyStatus status = NKikimrProto::OK;

Expand Down
20 changes: 19 additions & 1 deletion ydb/core/keyvalue/keyvalue_intermediate.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,30 @@ struct TIntermediate {
struct TSetExecutorFastLogPolicy {
bool IsAllowed;
};
struct TPatch {
struct TDiff {
ui32 Offset;
TRope Buffer;
};

TString OriginalKey;
TLogoBlobID OriginalBlobId;
TString PatchedKey;
TLogoBlobID PatchedBlobId;

NKikimrProto::EReplyStatus Status;
TStorageStatusFlags StatusFlags;

TVector<TDiff> Diffs;
};

using TCmd = std::variant<TWrite, TDelete, TRename, TCopyRange, TConcat>;
using TCmd = std::variant<TWrite, TDelete, TRename, TCopyRange, TConcat, TPatch>;
using TReadCmd = std::variant<TRead, TRangeRead>;

TDeque<TRead> Reads;
TDeque<TRangeRead> RangeReads;
TDeque<TWrite> Writes;
TDeque<TPatch> Patches;
TDeque<TDelete> Deletes;
TDeque<TRename> Renames;
TDeque<TCopyRange> CopyRanges;
Expand All @@ -120,6 +137,7 @@ struct TIntermediate {

TStackVec<TCmd, 1> Commands;
TStackVec<ui32, 1> WriteIndices;
TStackVec<ui32, 1> PatchIndices;
std::optional<TReadCmd> ReadCommand;

ui64 WriteCount = 0;
Expand Down
Loading