Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ namespace NKikimr {
TEvResumeForce *ResumeForceToken = nullptr;
TInstant ReplicationEndTime;
bool UnrecoveredNonphantomBlobs = false;
bool RequestedReplicationToken = false;
bool HoldingReplicationToken = false;

TWatchdogTimer<TEvReplCheckProgress> ReplProgressWatchdog;

Expand Down Expand Up @@ -287,6 +289,12 @@ namespace NKikimr {
case Plan:
// this is a first quantum of replication, so we have to register it in the broker
State = AwaitToken;
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
if (RequestedReplicationToken) {
STLOG(PRI_CRIT, BS_REPL, BSVR38, ReplCtx->VCtx->VDiskLogPrefix << "excessive replication token requested");
break;
}
RequestedReplicationToken = true;
if (!Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(ReplCtx->VDiskCfg->BaseInfo.PDiskId))) {
HandleReplToken();
}
Expand All @@ -303,6 +311,10 @@ namespace NKikimr {
}

void HandleReplToken() {
Y_ABORT_UNLESS(RequestedReplicationToken);
RequestedReplicationToken = false;
HoldingReplicationToken = true;

// switch to replication state
Transition(AwaitToken, Replication);
if (!ResumeIfReady()) {
Expand Down Expand Up @@ -408,6 +420,9 @@ namespace NKikimr {
if (State == WaitQueues || State == Replication) {
// release token as we have finished replicating
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
Y_DEBUG_ABORT_UNLESS(HoldingReplicationToken);
HoldingReplicationToken = false;
}
ResetReplProgressTimer(true);

Expand Down Expand Up @@ -635,7 +650,15 @@ namespace NKikimr {

// return replication token if we have one
if (State == AwaitToken || State == WaitQueues || State == Replication) {
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
Y_DEBUG_ABORT_UNLESS(RequestedReplicationToken || HoldingReplicationToken);
if (RequestedReplicationToken || HoldingReplicationToken) {
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
}
} else {
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken && !HoldingReplicationToken);
if (RequestedReplicationToken || HoldingReplicationToken) {
STLOG(PRI_CRIT, BS_REPL, BSVR37, ReplCtx->VCtx->VDiskLogPrefix << "stuck replication token");
}
}

if (ReplJobActorId) {
Expand Down
22 changes: 17 additions & 5 deletions ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ namespace NKikimr {
ui64 NextReceiveCookie;
TResultQueue ResultQueue;
std::shared_ptr<TMessageRelevanceTracker> Tracker = std::make_shared<TMessageRelevanceTracker>();
bool Terminated = false;

TQueue<std::unique_ptr<TEvBlobStorage::TEvVGet>> SchedulerRequestQ;
THashMap<ui64, TReplMemTokenId> RequestTokens;
Expand Down Expand Up @@ -227,9 +228,7 @@ namespace NKikimr {
PrefetchDataSize = 0;
RequestFromVDiskProxyPending = false;
if (Finished) {
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
RequestTokens.clear();
return PassAway(); // TODO(alexvru): check correctness of invocations
return PassAway();
}
}
// send request(s) if prefetch queue is not full
Expand Down Expand Up @@ -297,6 +296,9 @@ namespace NKikimr {
if (msg->Record.GetCookie() == NextReceiveCookie) {
ui64 cookie = NextReceiveCookie;
ProcessResult(msg);
if (Terminated) {
return;
}
ReleaseMemToken(cookie);
while (!ResultQueue.empty()) {
const TQueueItem& top = ResultQueue.top();
Expand All @@ -305,6 +307,9 @@ namespace NKikimr {
}
ui64 cookie = NextReceiveCookie;
ProcessResult(top.get());
if (Terminated) {
return;
}
ReleaseMemToken(cookie);
ResultQueue.pop();
}
Expand All @@ -314,6 +319,7 @@ namespace NKikimr {
}

void ReleaseMemToken(ui64 cookie) {
Y_ABORT_UNLESS(!Terminated);
if (RequestTokens) {
auto it = RequestTokens.find(cookie);
Y_ABORT_UNLESS(it != RequestTokens.end());
Expand Down Expand Up @@ -428,6 +434,13 @@ namespace NKikimr {
}
}

void PassAway() override {
Y_ABORT_UNLESS(!Terminated);
Terminated = true;
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
TActorBootstrapped::PassAway();
}

STRICT_STFUNC(StateFunc,
hFunc(TEvReplProxyNext, Handle)
hFunc(TEvReplMemToken, Handle)
Expand All @@ -446,8 +459,7 @@ namespace NKikimr {
TTrackableVector<TVDiskProxy::TScheduledBlob>&& ids,
const TVDiskID& vdiskId,
const TActorId& serviceId)
: TActorBootstrapped<TVDiskProxyActor>()
, ReplCtx(std::move(replCtx))
: ReplCtx(std::move(replCtx))
, GType(ReplCtx->VCtx->Top->GType)
, Ids(std::move(ids))
, VDiskId(vdiskId)
Expand Down
Loading