Skip to content

Commit f619c2a

Browse files
authored
Merge 5011617 into 76bbb65
2 parents 76bbb65 + 5011617 commit f619c2a

27 files changed

+845
-53
lines changed

ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,10 @@ ui32 TBlobStorageGroupInfo::TTopology::GetIdxInSubgroup(const TVDiskIdShort& vdi
419419
return BlobMapper->GetIdxInSubgroup(vdisk, hash);
420420
}
421421

422+
bool TBlobStorageGroupInfo::TTopology::IsHandoff(const TVDiskIdShort& vdisk, ui32 hash) const {
423+
return BlobMapper->GetIdxInSubgroup(vdisk, hash) >= GType.TotalPartCount();
424+
}
425+
422426
TVDiskIdShort TBlobStorageGroupInfo::TTopology::GetVDiskInSubgroup(ui32 idxInSubgroup, ui32 hash) const {
423427
return BlobMapper->GetVDiskInSubgroup(idxInSubgroup, hash);
424428
}

ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ class TBlobStorageGroupInfo : public TThrRefBase {
250250
// function returns idxInSubgroup-th element of vdisks array from PickSubgroup
251251
TVDiskIdShort GetVDiskInSubgroup(ui32 idxInSubgroup, ui32 hash) const;
252252

253+
bool IsHandoff(const TVDiskIdShort& vdisk, ui32 hash) const;
254+
253255

254256
TFailRealmIterator FailRealmsBegin() const;
255257
TFailRealmIterator FailRealmsEnd() const;

ydb/core/blobstorage/ut_blobstorage/balancing.cpp

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ struct TTestEnv {
6060
return data;
6161
};
6262

63-
void SendPut(ui32 step, const TString& data, NKikimrProto::EReplyStatus expectedStatus) {
63+
NKikimrProto::EReplyStatus SendPut(ui32 step, const TString& data) {
6464
const TLogoBlobID id(1, 1, step, 0, data.size(), 0);
6565
Cerr << "SEND TEvPut with key " << id.ToString() << Endl;
6666
const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__);
@@ -69,8 +69,12 @@ struct TTestEnv {
6969
SendToBSProxy(sender, GroupInfo->GroupID, ev.release());
7070
});
7171
auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender, false);
72-
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, expectedStatus);
7372
Cerr << "TEvPutResult: " << res->Get()->ToString() << Endl;
73+
return res->Get()->Status;
74+
};
75+
76+
void SendPut(ui32 step, const TString& data, NKikimrProto::EReplyStatus expectedStatus) {
77+
UNIT_ASSERT_VALUES_EQUAL(SendPut(step, data), expectedStatus);
7478
};
7579

7680
auto SendGet(ui32 step, ui32 dataSize, bool mustRestoreFirst=false) {
@@ -114,7 +118,6 @@ struct TTestEnv {
114118
const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__);
115119
TVector<ui32> partsRes;
116120

117-
Cerr << "Get request for vdisk " << position << Endl;
118121
auto queueId = GetQueue(vDiskId);
119122
Env.Runtime->WrapInActorContext(sender, [&] {
120123
Env.Runtime->Send(new IEventHandle(queueId, sender, ev.release()));
@@ -244,13 +247,6 @@ struct TStopOneNodeTest {
244247
Env->Sim(TDuration::Seconds(10));
245248
Cerr << "Finish compaction 2" << Endl;
246249

247-
Cerr << "Start compaction 3" << Endl;
248-
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
249-
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
250-
}
251-
Env->Sim(TDuration::Seconds(10));
252-
Cerr << "Finish compaction 3" << Endl;
253-
254250
Env.CheckPartsLocations(MakeLogoBlobId(step, data.size()));
255251
UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data);
256252
}
@@ -264,21 +260,23 @@ struct TRandomTest {
264260
void RunTest() {
265261
TVector<TString> data(Reserve(NumIters));
266262

263+
TVector<ui32> successfulSteps;
264+
267265
for (ui32 step = 0; step < NumIters; ++step) {
268-
Cerr << step << Endl;
266+
Cerr << "Step = " << step << Endl;
269267
data.push_back(GenData(16 + random() % 4096));
270-
auto blobId = MakeLogoBlobId(step, data.back().size());
271-
auto locations = Env.GetExpectedPartsLocations(blobId);
268+
269+
if (Env.SendPut(step, data.back()) == NKikimrProto::OK) {
270+
successfulSteps.push_back(step);
271+
}
272272

273273
if (random() % 10 == 1 && Env.RunningNodes.size() + 2 > Env->Settings.NodeCount) {
274-
ui32 nodeId = random() % Env->Settings.NodeCount;
275-
Cerr << "Stop node " << nodeId << Endl;
276-
Env.StopNode(nodeId);
274+
ui32 pos = random() % Env->Settings.NodeCount;
275+
Cerr << "Stop node " << pos << Endl;
276+
Env.StopNode(pos);
277277
Env->Sim(TDuration::Seconds(10));
278278
}
279279

280-
Env.SendPut(step, data.back(), NKikimrProto::OK);
281-
282280
if (random() % 10 == 1) {
283281
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
284282
if (!Env.RunningNodes.contains(pos)) {
@@ -293,6 +291,7 @@ struct TRandomTest {
293291
if (random() % 50 == 1) {
294292
ui32 pos = random() % Env->Settings.NodeCount;
295293
if (Env.RunningNodes.contains(pos)) {
294+
Cerr << "Compact vdisk " << pos << Endl;
296295
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
297296
Env->Sim(TDuration::Seconds(10));
298297
}
@@ -302,6 +301,7 @@ struct TRandomTest {
302301
if (random() % 100 == 1) {
303302
ui32 pos = random() % Env->Settings.NodeCount;
304303
if (Env.RunningNodes.contains(pos)) {
304+
Cerr << "Wipe node " << pos << Endl;
305305
auto baseConfig = Env->FetchBaseConfig();
306306
const auto& somePDisk = baseConfig.GetPDisk(pos);
307307
const auto& someVSlot = baseConfig.GetVSlot(pos);
@@ -311,14 +311,29 @@ struct TRandomTest {
311311
}
312312
}
313313

314+
UNIT_ASSERT(successfulSteps.size() > NumIters / 2);
315+
316+
Cerr << "Starting nodes" << Endl;
314317
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
315318
Env.StartNode(pos);
316319
}
320+
Env->Sim(TDuration::Seconds(10));
321+
322+
Cerr << "Start compaction 1" << Endl;
323+
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
324+
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
325+
}
326+
Env->Sim(TDuration::Seconds(60));
327+
328+
Cerr << "Start compaction 2" << Endl;
329+
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
330+
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
331+
}
332+
Env->Sim(TDuration::Seconds(60));
317333

318-
Env->Sim(TDuration::Seconds(300));
319334
Cerr << "Start checking" << Endl;
320-
for (ui32 step = 0; step < NumIters; ++step) {
321-
Cerr << step << Endl;
335+
for (ui32 step: successfulSteps) {
336+
Cerr << "step = " << step << Endl;
322337
Env.CheckPartsLocations(MakeLogoBlobId(step, data[step].size()));
323338
UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data[step].size())->Get()->Responses[0].Buffer.ConvertToString(), data[step]);
324339
}
@@ -336,7 +351,7 @@ Y_UNIT_TEST_SUITE(VDiskBalancing) {
336351
TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(100)}.RunTest();
337352
}
338353
Y_UNIT_TEST(TestStopOneNode_Block42_HugeBlob) {
339-
TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(521_KB)}.RunTest();
354+
TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(521_KB * 6)}.RunTest();
340355
}
341356
Y_UNIT_TEST(TestStopOneNode_Mirror3dc_HugeBlob) {
342357
TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(521_KB)}.RunTest();

ydb/core/blobstorage/ut_blobstorage/lib/env.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ struct TEnvironmentSetup {
273273
// NKikimrServices::LOCAL,
274274
// NActorsServices::INTERCONNECT,
275275
// NActorsServices::INTERCONNECT_SESSION,
276+
// NKikimrServices::BS_VDISK_BALANCING,
276277
};
277278
for (const auto& comp : debug) {
278279
Runtime->SetLogPriority(comp, NLog::PRI_DEBUG);

ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
UNITTEST_FOR(ydb/core/blobstorage/ut_blobstorage)
2-
SKIP_TEST(VDisks balancing is not implemented yet)
32

43
SIZE(MEDIUM)
54

5+
FORK_SUBTESTS()
6+
67
TIMEOUT(600)
78

89
SRCS(

ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66

77
namespace NKikimr {
88

9+
// prepared data to insert to Hull Database
10+
struct THullDbInsert {
11+
TLogoBlobID Id;
12+
TIngress Ingress;
13+
};
14+
915
struct TAnubisOsirisPutRecoveryLogRec;
1016

1117
////////////////////////////////////////////////////////////////////////////
@@ -47,12 +53,6 @@ namespace NKikimr {
4753
return !IsAnubis();
4854
}
4955

50-
// prepared data to insert to Hull Database
51-
struct THullDbInsert {
52-
TLogoBlobID Id;
53-
TIngress Ingress;
54-
};
55-
5656
// return data to insert to Hull Database, we create ingress according to whether this
5757
// blob is Anubis or Osiris record
5858
THullDbInsert PrepareInsert(const TBlobStorageGroupInfo::TTopology *top,
Lines changed: 165 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,189 @@
11
#include "balancing_actor.h"
22
#include "defs.h"
3+
#include "utils.h"
4+
5+
#include <ydb/core/blobstorage/vdisk/common/vdisk_queues.h>
6+
#include <ydb/core/blobstorage/vdisk/repl/blobstorage_replbroker.h>
37

48

59
namespace NKikimr {
610

711
class TBalancingActor : public TActorBootstrapped<TBalancingActor> {
812
private:
913
std::shared_ptr<TBalancingCtx> Ctx;
10-
public:
11-
void Bootstrap() {
12-
Become(&TThis::StateFunc);
14+
TLogoBlobsSnapshot::TForwardIterator It;
15+
TQueueActorMapPtr QueueActorMapPtr;
16+
TActiveActors ActiveActors;
17+
THashSet<TVDiskID> ConnectedVDisks;
18+
19+
TActorId SenderId;
20+
TActorId DeleterId;
21+
22+
struct TStats {
23+
bool SendCompleted = false;
24+
bool DeleteCompleted = false;
25+
};
26+
27+
TStats Stats;
28+
29+
void CreateVDisksQueues() {
30+
QueueActorMapPtr = std::make_shared<TQueueActorMap>();
31+
auto interconnectChannel = TInterconnectChannels::EInterconnectChannels::IC_BLOBSTORAGE_ASYNC_DATA;
32+
const TBlobStorageGroupInfo::TTopology& topology = Ctx->GInfo->GetTopology();
33+
NBackpressure::TQueueClientId queueClientId(
34+
NBackpressure::EQueueClientType::Balancing, topology.GetOrderNumber(Ctx->VCtx->ShortSelfVDisk));
35+
36+
CreateQueuesForVDisks(*QueueActorMapPtr, SelfId(), Ctx->GInfo, Ctx->VCtx,
37+
Ctx->GInfo->GetVDisks(), Ctx->MonGroup.GetGroup(),
38+
queueClientId, NKikimrBlobStorage::EVDiskQueueId::GetAsyncRead,
39+
"DisksBalancing", interconnectChannel);
40+
}
41+
42+
std::pair<TQueue<TPartInfo>, TQueue<TPartInfo>> CollectKeys(const TActorContext &ctx) {
43+
TQueue<TPartInfo> sendOnMainParts, tryDeleteParts;
44+
45+
for (It.SeekToFirst(); It.Valid(); It.Next()) {
46+
TPartsCollectorMerger merger(Ctx->GInfo->GetTopology().GType);
47+
It.PutToMerger(&merger);
48+
49+
for (ui8 partIdx: PartsToSendOnMain(Ctx->GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, It.GetCurKey().LogoBlobID(), merger.Ingress)) {
50+
if (!merger.Parts[partIdx - 1].has_value()) {
51+
LOG_WARN_S(ctx, NKikimrServices::BS_VDISK_BALANCING,
52+
Ctx->GInfo->GetTopology().GetOrderNumber(Ctx->VCtx->ShortSelfVDisk) << "$ "
53+
<< "not found part " << (ui32)partIdx << " for " << It.GetCurKey().LogoBlobID().ToString());
54+
continue; // something strange
55+
}
56+
sendOnMainParts.push(TPartInfo{
57+
.Key=TLogoBlobID(It.GetCurKey().LogoBlobID(), partIdx),
58+
.Ingress=merger.Ingress,
59+
.PartData=*merger.Parts[partIdx - 1]
60+
});
61+
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_BALANCING,
62+
Ctx->GInfo->GetTopology().GetOrderNumber(Ctx->VCtx->ShortSelfVDisk) << "$ "
63+
<< "Send on main: " << sendOnMainParts.back().Key.ToString() << " " << sendOnMainParts.back().Ingress.ToString(&Ctx->GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, sendOnMainParts.back().Key));
64+
}
65+
66+
for (ui8 partIdx: PartsToDelete(Ctx->GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, It.GetCurKey().LogoBlobID(), merger.Ingress)) {
67+
tryDeleteParts.push(TPartInfo{
68+
.Key=TLogoBlobID(It.GetCurKey().LogoBlobID(), partIdx),
69+
.Ingress=merger.Ingress,
70+
});
71+
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_BALANCING,
72+
Ctx->GInfo->GetTopology().GetOrderNumber(Ctx->VCtx->ShortSelfVDisk) << "$ "
73+
<< "Delete: " << tryDeleteParts.back().Key.ToString() << " " << tryDeleteParts.back().Ingress.ToString(&Ctx->GInfo->GetTopology(), Ctx->VCtx->ShortSelfVDisk, tryDeleteParts.back().Key));
74+
}
75+
merger.Clear();
76+
}
77+
return {sendOnMainParts, tryDeleteParts};
78+
}
79+
80+
void StartBalancing(const TActorContext &ctx) {
81+
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_BALANCING,
82+
Ctx->GInfo->GetTopology().GetOrderNumber(Ctx->VCtx->ShortSelfVDisk) << "$ " << "Ask repl token");
83+
if (!Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(Ctx->VDiskCfg->BaseInfo.PDiskId))) {
84+
HandleReplToken(ctx);
85+
}
86+
}
87+
88+
void HandleReplToken(const TActorContext &ctx) {
89+
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_BALANCING,
90+
Ctx->GInfo->GetTopology().GetOrderNumber(Ctx->VCtx->ShortSelfVDisk) << "$ " << "Repl token acquired");
91+
DoJobQuant(ctx);
92+
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
93+
Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup());
94+
}
95+
96+
void Handle(NActors::TEvents::TEvCompleted::TPtr ev, const TActorContext &ctx) {
97+
switch (ev->Get()->Id) {
98+
case SENDER_ID: {
99+
Stats.SendCompleted = true;
100+
break;
101+
}
102+
case DELETER_ID: {
103+
Stats.DeleteCompleted = true;
104+
break;
105+
}
106+
default:
107+
Y_ABORT("Unexpected id");
108+
}
109+
if (Stats.SendCompleted && Stats.DeleteCompleted) {
110+
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_BALANCING,
111+
Ctx->GInfo->GetTopology().GetOrderNumber(Ctx->VCtx->ShortSelfVDisk) << "$ " << "Balancing completed");
112+
Send(Ctx->SkeletonId, new TEvStartBalancing());
113+
Send(SelfId(), new NActors::TEvents::TEvPoison);
114+
}
115+
}
116+
117+
void Handle(TEvProxyQueueState::TPtr ev) {
118+
const TVDiskID& vdiskId = ev->Get()->VDiskId;
119+
if (ev->Get()->IsConnected) {
120+
ConnectedVDisks.insert(vdiskId);
121+
} else {
122+
ConnectedVDisks.erase(vdiskId);
123+
}
124+
}
125+
126+
void Handle(TEvVGenerationChange::TPtr ev) {
127+
Cerr << "TEvVGenerationChange" << Endl;
128+
// forward message to queue actors
129+
TEvVGenerationChange *msg = ev->Get();
130+
for (const auto& kv : *QueueActorMapPtr) {
131+
Send(kv.second, msg->Clone());
132+
}
133+
Ctx->GInfo = msg->NewInfo;
134+
}
135+
136+
void DoJobQuant(const TActorContext& ctx) {
137+
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_BALANCING,
138+
Ctx->GInfo->GetTopology().GetOrderNumber(Ctx->VCtx->ShortSelfVDisk) << "$ " << "Connected vdisks " << ConnectedVDisks.size() << "/" << Ctx->GInfo->GetTotalVDisksNum() - 1);
139+
Send(SenderId, new NActors::TEvents::TEvWakeup());
140+
Send(DeleterId, new NActors::TEvents::TEvWakeup());
141+
}
142+
143+
void PassAway() override {
144+
for (const auto& kv : *QueueActorMapPtr) {
145+
Send(kv.second, new TEvents::TEvPoison);
146+
}
147+
TActorBootstrapped::PassAway();
13148
}
14149

15150
STRICT_STFUNC(StateFunc,
16-
CFunc(NActors::TEvents::TEvPoison::EventType, Die)
151+
CFunc(TEvReplToken::EventType, HandleReplToken)
152+
HFunc(NActors::TEvents::TEvCompleted, Handle)
153+
hFunc(TEvProxyQueueState, Handle)
154+
hFunc(TEvVGenerationChange, Handle)
155+
CFunc(NActors::TEvents::TEvWakeup::EventType, StartBalancing)
156+
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway)
17157
);
18158

159+
public:
19160
TBalancingActor(std::shared_ptr<TBalancingCtx> &ctx)
20161
: TActorBootstrapped<TBalancingActor>()
21162
, Ctx(ctx)
163+
, It(Ctx->Snap.HullCtx, &Ctx->Snap.LogoBlobsSnap)
22164
{
23165
}
166+
167+
void Bootstrap(const TActorContext &ctx) {
168+
CreateVDisksQueues();
169+
auto [sendOnMainParts, tryDeleteParts] = CollectKeys(ctx);
170+
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_BALANCING,
171+
Ctx->GInfo->GetTopology().GetOrderNumber(Ctx->VCtx->ShortSelfVDisk) << "$ "
172+
<< ctx.Now().MilliSeconds() << " Bootstrap" << ": "
173+
<< "sendOnMainParts size = " << sendOnMainParts.size() << "; tryDeleteParts size = " << tryDeleteParts.size());
174+
175+
SenderId = ctx.Register(CreateSenderActor(SelfId(), std::move(sendOnMainParts), QueueActorMapPtr, Ctx));
176+
ActiveActors.Insert(SenderId, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
177+
178+
DeleterId = ctx.Register(CreateDeleterActor(SelfId(), std::move(tryDeleteParts), QueueActorMapPtr, Ctx));
179+
ActiveActors.Insert(DeleterId, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
180+
181+
Become(&TThis::StateFunc);
182+
Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup());
183+
}
24184
};
25185

26186
IActor* CreateBalancingActor(std::shared_ptr<TBalancingCtx> ctx) {
27187
return new TBalancingActor(ctx);
28188
}
29-
}
189+
} // NKikimr

0 commit comments

Comments
 (0)