Skip to content

Commit 55bc8e2

Browse files
authored
Put log chunks from killed owners on quarantine, if owner is still reading log (ydb-platform#12857)
Cherry-picked from eafe9a1
1 parent ed94265 commit 55bc8e2

File tree

7 files changed

+269
-29
lines changed

7 files changed

+269
-29
lines changed

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2129,16 +2129,20 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven
21292129
}
21302130

21312131
TryTrimChunk(false, 0, NWilson::TSpan{});
2132+
bool readingLog = OwnerData[owner].ReadingLog();
21322133
ui64 lastSeenLsn = 0;
21332134
auto it = LogChunks.begin();
21342135
while (it != LogChunks.end()) {
21352136
if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) {
2136-
Y_ABORT_UNLESS(it->CurrentUserCount > 0);
2137-
it->CurrentUserCount--;
2138-
it->OwnerLsnRange[owner].IsPresent = false;
2139-
it->OwnerLsnRange[owner].FirstLsn = 0;
21402137
lastSeenLsn = Max(it->OwnerLsnRange[owner].LastLsn, lastSeenLsn);
2141-
it->OwnerLsnRange[owner].LastLsn = 0;
2138+
2139+
if (!readingLog) {
2140+
Y_ABORT_UNLESS(it->CurrentUserCount > 0);
2141+
it->CurrentUserCount--;
2142+
it->OwnerLsnRange[owner].IsPresent = false;
2143+
it->OwnerLsnRange[owner].FirstLsn = 0;
2144+
it->OwnerLsnRange[owner].LastLsn = 0;
2145+
}
21422146
}
21432147
++it;
21442148
}
@@ -2376,22 +2380,50 @@ void TPDisk::ClearQuarantineChunks() {
23762380
*Mon.QuarantineChunks = QuarantineChunks.size();
23772381
}
23782382

2383+
bool haveChunksToRelease = false;
2384+
23792385
{
23802386
const auto it = std::partition(QuarantineOwners.begin(), QuarantineOwners.end(), [&] (TOwner i) {
23812387
return Keeper.GetOwnerUsed(i) || OwnerData[i].HaveRequestsInFlight();
23822388
});
23832389
for (auto delIt = it; delIt != QuarantineOwners.end(); ++delIt) {
2384-
ADD_RECORD_WITH_TIMESTAMP_TO_OPERATION_LOG(OwnerData[*delIt].OperationLog, "Remove owner from quarantine, OwnerId# " << *delIt);
2385-
TOwnerRound ownerRound = OwnerData[*delIt].OwnerRound;
2386-
OwnerData[*delIt].Reset(false);
2387-
OwnerData[*delIt].OwnerRound = ownerRound;
2388-
Keeper.RemoveOwner(*delIt);
2390+
TOwner owner = *delIt;
2391+
ADD_RECORD_WITH_TIMESTAMP_TO_OPERATION_LOG(OwnerData[owner].OperationLog, "Remove owner from quarantine, OwnerId# " << owner);
2392+
TOwnerRound ownerRound = OwnerData[owner].OwnerRound;
2393+
OwnerData[owner].Reset(false);
2394+
OwnerData[owner].OwnerRound = ownerRound;
2395+
Keeper.RemoveOwner(owner);
2396+
2397+
ui64 lastSeenLsn = 0;
2398+
auto it = LogChunks.begin();
2399+
while (it != LogChunks.end()) {
2400+
if (it->OwnerLsnRange.size() > owner && it->OwnerLsnRange[owner].IsPresent) {
2401+
Y_ABORT_UNLESS(it->CurrentUserCount > 0);
2402+
ui32 userCount = --it->CurrentUserCount;
2403+
it->OwnerLsnRange[owner].IsPresent = false;
2404+
it->OwnerLsnRange[owner].FirstLsn = 0;
2405+
lastSeenLsn = Max(it->OwnerLsnRange[owner].LastLsn, lastSeenLsn);
2406+
it->OwnerLsnRange[owner].LastLsn = 0;
2407+
2408+
if (userCount == 0) {
2409+
haveChunksToRelease = true;
2410+
}
2411+
}
2412+
++it;
2413+
}
23892414
LOG_NOTICE_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId
2390-
<< " removed ownerId# " << *delIt << " from chunks Keeper through QuarantineOwners");
2415+
<< " removed ownerId# " << (ui32)owner << " lastSeenLsn#" << lastSeenLsn << " from chunks Keeper through QuarantineOwners" << (haveChunksToRelease ? " along with log chunks" : ""));
23912416
}
23922417
QuarantineOwners.erase(it, QuarantineOwners.end());
23932418
*Mon.QuarantineOwners = QuarantineOwners.size();
23942419
}
2420+
2421+
if (haveChunksToRelease) {
2422+
THolder<TCompletionEventSender> completion(new TCompletionEventSender(this));
2423+
if (ReleaseUnusedLogChunks(completion.Get())) {
2424+
WriteSysLogRestorePoint(completion.Release(), TReqId(TReqId::KillOwnerSysLog, 0), {});
2425+
}
2426+
}
23952427
}
23962428

23972429
// Should be called to initiate TRIM (on chunk delete or prev trim done)

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class TPDisk : public IPDisk {
142142
ui64 InsaneLogChunks = 0; // Set when pdisk sees insanely large log, to give vdisks a chance to cut it
143143
ui32 FirstLogChunkToParseCommits = 0;
144144

145-
// Chunks that is owned by killed owner, but has operations InFlight
145+
// Chunks that are owned by killed owner, but have operations InFlight
146146
TVector<TChunkIdx> QuarantineChunks;
147147
TVector<TOwner> QuarantineOwners;
148148

ydb/core/blobstorage/pdisk/blobstorage_pdisk_logreader.cpp

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -808,19 +808,41 @@ bool TLogReader::ProcessSectorSet(TSectorData *sector) {
808808
<< " LastGoodToWriteLogPosition# " << LastGoodToWriteLogPosition
809809
<< " Marker# LR018");
810810
} else {
811-
Y_VERIFY_S(ChunkIdx == LogEndChunkIdx && SectorIdx >= LogEndSectorIdx, SelfInfo()
812-
<< " File# " << __FILE__
813-
<< " Line# " << __LINE__
814-
<< " LogEndChunkIdx# " << LogEndChunkIdx
815-
<< " LogEndSectorIdx# " << LogEndSectorIdx);
816-
if (!(ChunkIdx == LogEndChunkIdx && SectorIdx >= LogEndSectorIdx)) {
817-
LOG_WARN_S(*PDisk->ActorSystem, NKikimrServices::BS_PDISK, SelfInfo()
818-
<< " In ProcessSectorSet got !restorator.GoodSectorFlags outside the LogEndSector."
811+
bool outsideLogEnd = ChunkIdx == LogEndChunkIdx && SectorIdx >= LogEndSectorIdx;
812+
813+
if (!outsideLogEnd) {
814+
// If read invalid data from the log (but not outside this owner's log bounds), check if the owner is on quarantine.
815+
TGuard<TMutex> guard(PDisk->StateMutex);
816+
TOwnerData &ownerData = PDisk->OwnerData[Owner];
817+
818+
if (ownerData.OnQuarantine) {
819+
LOG_WARN_S(*PDisk->ActorSystem, NKikimrServices::BS_PDISK, SelfInfo()
820+
<< " In ProcessSectorSet got !restorator.GoodSectorFlags with owner on quarantine"
819821
<< " File# " << __FILE__
820822
<< " Line# " << __LINE__
821823
<< " LogEndChunkIdx# " << LogEndChunkIdx
822824
<< " LogEndSectorIdx# " << LogEndSectorIdx
823-
<< " Marker# LR004");
825+
<< " Marker# LR019");
826+
ReplyOk();
827+
return true;
828+
}
829+
}
830+
831+
Y_VERIFY_S(outsideLogEnd, SelfInfo()
832+
<< " File# " << __FILE__
833+
<< " Line# " << __LINE__
834+
<< " LogEndChunkIdx# " << LogEndChunkIdx
835+
<< " LogEndSectorIdx# " << LogEndSectorIdx);
836+
837+
if (outsideLogEnd) {
838+
// It's ok.
839+
LOG_WARN_S(*PDisk->ActorSystem, NKikimrServices::BS_PDISK, SelfInfo()
840+
<< " In ProcessSectorSet got !restorator.GoodSectorFlags outside the LogEndSector"
841+
<< " File# " << __FILE__
842+
<< " Line# " << __LINE__
843+
<< " LogEndChunkIdx# " << LogEndChunkIdx
844+
<< " LogEndSectorIdx# " << LogEndSectorIdx
845+
<< " Marker# LR004");
824846
}
825847
}
826848

ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ enum class EInitPhase {
2323
};
2424

2525
enum EOwner {
26-
OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, mens "for dynamic" in requests
26+
OwnerSystem = 0, // Chunk0, SysLog chunks and CommonLog + just common log tracking, means "for dynamic" in requests
2727
OwnerUnallocated = 1, // Unallocated chunks, Trim scheduling, Slay commands
2828
OwnerBeginUser = 2,
2929
OwnerEndUser = 241,
@@ -156,6 +156,10 @@ struct TOwnerData {
156156
return LogReader || InFlight->ChunkWrites.load() || InFlight->ChunkReads.load() || InFlight->LogWrites.load();
157157
}
158158

159+
bool ReadingLog() const {
160+
return bool(LogReader);
161+
}
162+
159163
TString ToString() const {
160164
TStringStream str;
161165
str << "TOwnerData {";

ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct TActorTestContext {
2626
ui32 ChunkSize = 128 * (1 << 20);
2727
bool SmallDisk = false;
2828
bool SuppressCompatibilityCheck = false;
29+
TAutoPtr<TLogBackend> LogBackend = nullptr;
2930
};
3031

3132
private:
@@ -71,7 +72,11 @@ struct TActorTestContext {
7172
IoContext = std::make_shared<NPDisk::TIoContextFactoryOSS>();
7273
appData->IoContextFactory = IoContext.get();
7374

74-
Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend());
75+
if (Settings.LogBackend) {
76+
Runtime->SetLogBackend(Settings.LogBackend);
77+
} else {
78+
Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend());
79+
}
7580
Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}, {}});
7681
Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE);
7782
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE);
@@ -130,7 +135,7 @@ struct TActorTestContext {
130135
}
131136
return PDisk;
132137
}
133-
138+
134139
void GracefulPDiskRestart(bool waitForRestart = true) {
135140
ui32 pdiskId = GetPDisk()->PDiskId;
136141

ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_races.cpp

Lines changed: 169 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,11 +254,11 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) {
254254
}
255255

256256
Y_UNIT_TEST(KillOwnerWhileDecommittingWithInflight) {
257-
TestKillOwnerWhileDecommitting(false, 20, 0, 10, 100);
257+
TestKillOwnerWhileDecommitting(false, 20, 50, 10, 100);
258258
}
259259

260260
Y_UNIT_TEST(KillOwnerWhileDecommittingWithInflightMock) {
261-
TestKillOwnerWhileDecommitting(true, 20, 0, 10, 100);
261+
TestKillOwnerWhileDecommitting(true, 20, 50, 10, 100);
262262
}
263263

264264
void OwnerRecreationRaces(bool usePDiskMock, ui32 timeLimit, ui32 vdisksNum) {
@@ -325,6 +325,173 @@ Y_UNIT_TEST_SUITE(TPDiskRaces) {
325325
Y_UNIT_TEST(OwnerRecreationRaces) {
326326
OwnerRecreationRaces(false, 20, 1);
327327
}
328+
329+
void TestKillOwnerWhileReadingLog(ui32 timeLimit) {
330+
// This test is not deterministic, so we run it multiple times to increase the chance of catching the bug.
331+
// We expect to see quarantined log chunks in the log at least once (however locally it was seen every time).
332+
// The original bug was crashing the server, so this test also tests this and that's why it doesn't break the cycle
333+
// upon encountering quarantined log chunks.
334+
bool capturedQuarantinedLogChunks = false;
335+
THPTimer timer;
336+
while (timer.Passed() < timeLimit) {
337+
TStringStream ss;
338+
339+
TActorTestContext testCtx({
340+
.IsBad = false,
341+
.UsePDiskMock = false,
342+
.LogBackend = new TStreamLogBackend(&ss),
343+
});
344+
const TString data = PrepareData(10_MB);
345+
346+
auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
347+
TString dataCopy = data;
348+
auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, TRcBuf(dataCopy),
349+
mock.GetLsnSeg(), nullptr);
350+
evLog->Signature.SetCommitRecord();
351+
evLog->CommitRecord = std::move(rec);
352+
testCtx.Send(evLog.Release());
353+
};
354+
355+
TVDiskMock mock(&testCtx);
356+
mock.Init();
357+
358+
for (ui32 i = 0; i < 20; ++i) {
359+
NPDisk::TCommitRecord rec;
360+
logNoTest(mock, rec);
361+
testCtx.Recv<NPDisk::TEvLogResult>();
362+
}
363+
364+
testCtx.RestartPDiskSync();
365+
366+
mock.Init();
367+
368+
NPDisk::TLogPosition position{0, 0};
369+
370+
bool readCallbackCalled = false;
371+
372+
testCtx.TestCtx.SectorMap->SetReadCallback([&]() {
373+
if (!readCallbackCalled) {
374+
readCallbackCalled = true;
375+
376+
testCtx.Send(new NPDisk::TEvHarakiri(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound));
377+
}
378+
});
379+
380+
testCtx.Send(new NPDisk::TEvReadLog(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, position));
381+
382+
testCtx.Recv<NPDisk::TEvHarakiriResult>();
383+
384+
{
385+
TVDiskMock mock(&testCtx);
386+
mock.Init();
387+
388+
for (ui32 i = 0; i < 13; ++i) {
389+
NPDisk::TCommitRecord rec;
390+
logNoTest(mock, rec);
391+
testCtx.Recv<NPDisk::TEvLogResult>();
392+
}
393+
}
394+
395+
if (!capturedQuarantinedLogChunks) {
396+
TString log = ss.Str();
397+
capturedQuarantinedLogChunks = log.Contains("along with log chunks");
398+
}
399+
}
400+
401+
UNIT_ASSERT(capturedQuarantinedLogChunks);
402+
}
403+
404+
Y_UNIT_TEST(OwnerKilledWhileReadingLog) {
405+
TestKillOwnerWhileReadingLog(20);
406+
}
407+
408+
void TestKillOwnerWhileReadingLogAndThenKillLastOwner(ui32 timeLimit) {
409+
bool capturedQuarantinedLogChunks = false;
410+
THPTimer timer;
411+
while (timer.Passed() < timeLimit) {
412+
TStringStream ss;
413+
414+
TActorTestContext testCtx({
415+
.IsBad = false,
416+
.UsePDiskMock = false,
417+
.LogBackend = new TStreamLogBackend(&ss),
418+
});
419+
const TString data = PrepareData(10_MB);
420+
421+
auto logNoTest = [&](TVDiskMock& mock, NPDisk::TCommitRecord rec) {
422+
TString dataCopy = data;
423+
auto evLog = MakeHolder<NPDisk::TEvLog>(mock.PDiskParams->Owner, mock.PDiskParams->OwnerRound, 0, TRcBuf(dataCopy),
424+
mock.GetLsnSeg(), nullptr);
425+
evLog->Signature.SetCommitRecord();
426+
evLog->CommitRecord = std::move(rec);
427+
testCtx.Send(evLog.Release());
428+
};
429+
430+
TVDiskMock mock1(&testCtx);
431+
mock1.Init();
432+
433+
TVDiskMock mock2(&testCtx);
434+
mock2.Init();
435+
436+
for (ui32 i = 0; i < 20; ++i) {
437+
{
438+
NPDisk::TCommitRecord rec;
439+
logNoTest(mock1, rec);
440+
testCtx.Recv<NPDisk::TEvLogResult>();
441+
}
442+
{
443+
NPDisk::TCommitRecord rec;
444+
logNoTest(mock2, rec);
445+
testCtx.Recv<NPDisk::TEvLogResult>();
446+
}
447+
}
448+
449+
testCtx.RestartPDiskSync();
450+
451+
mock1.Init();
452+
mock2.InitFull();
453+
454+
NPDisk::TLogPosition position{0, 0};
455+
456+
bool readCallbackCalled = false;
457+
458+
testCtx.TestCtx.SectorMap->SetReadCallback([&]() {
459+
if (!readCallbackCalled) {
460+
readCallbackCalled = true;
461+
462+
testCtx.Send(new NPDisk::TEvHarakiri(mock1.PDiskParams->Owner, mock1.PDiskParams->OwnerRound));
463+
testCtx.Send(new NPDisk::TEvHarakiri(mock2.PDiskParams->Owner, mock2.PDiskParams->OwnerRound));
464+
}
465+
});
466+
467+
testCtx.Send(new NPDisk::TEvReadLog(mock1.PDiskParams->Owner, mock1.PDiskParams->OwnerRound, position));
468+
469+
testCtx.Recv<NPDisk::TEvHarakiriResult>();
470+
testCtx.Recv<NPDisk::TEvHarakiriResult>();
471+
472+
{
473+
TVDiskMock mock(&testCtx);
474+
mock.Init();
475+
476+
for (ui32 i = 0; i < 30; ++i) {
477+
NPDisk::TCommitRecord rec;
478+
logNoTest(mock, rec);
479+
testCtx.Recv<NPDisk::TEvLogResult>();
480+
}
481+
}
482+
483+
if (!capturedQuarantinedLogChunks) {
484+
TString log = ss.Str();
485+
capturedQuarantinedLogChunks = log.Contains("along with log chunks");
486+
}
487+
}
488+
489+
UNIT_ASSERT(capturedQuarantinedLogChunks);
490+
}
491+
492+
Y_UNIT_TEST(OwnerKilledWhileReadingLogAndThenKillLastOwner) {
493+
TestKillOwnerWhileReadingLogAndThenKillLastOwner(20);
494+
}
328495
}
329496

330497
}

0 commit comments

Comments
 (0)