Skip to content

Commit 03a54f8

Browse files
authored
checkoint coordinator: handle failure on saving zero checkpoint (backport #13917) (#13963)
1 parent eced81b commit 03a54f8

File tree

3 files changed

+13
-3
lines changed

3 files changed

+13
-3
lines changed

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,11 +372,12 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvScheduleC
372372
CC_LOG_D("Got TEvScheduleCheckpointing");
373373
ScheduleNextCheckpoint();
374374
const auto checkpointsInFly = PendingCheckpoints.size() + PendingCommitCheckpoints.size();
375-
if (checkpointsInFly >= Settings.GetMaxInflight() || InitingZeroCheckpoint) {
375+
if (checkpointsInFly >= Settings.GetMaxInflight() || (InitingZeroCheckpoint && !FailedZeroCheckpoint)) {
376376
CC_LOG_W("Skip schedule checkpoint event since inflight checkpoint limit exceeded: current: " << checkpointsInFly << ", limit: " << Settings.GetMaxInflight());
377377
Metrics.SkippedDueToInFlightLimit->Inc();
378378
return;
379379
}
380+
FailedZeroCheckpoint = false;
380381
Metrics.SkippedDueToInFlightLimit->Set(0);
381382
InitCheckpoint();
382383
}
@@ -389,6 +390,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpo
389390
if (issues) {
390391
CC_LOG_E("[" << checkpointId << "] StorageError: can't create checkpoint: " << issues.ToOneLineString());
391392
PendingCheckpoints.erase(checkpointId);
393+
FailedZeroCheckpoint = InitingZeroCheckpoint;
392394
UpdateInProgressMetric();
393395
++*Metrics.FailedToCreate;
394396
++*Metrics.StorageError;
@@ -470,6 +472,7 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskSt
470472
CC_LOG_E("[" << checkpointId << "] Got all acks for aborted checkpoint, aborting in storage");
471473
CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
472474
Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
475+
FailedZeroCheckpoint = InitingZeroCheckpoint;
473476
} else {
474477
CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'");
475478
Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId, checkpoint.GetStats().StateSize), IEventHandle::FlagTrackDelivery);
@@ -494,6 +497,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvSetCheckpoint
494497
CC_LOG_E("[" << checkpointId << "] StorageError: can't change checkpoint status to 'PendingCommit': " << issues.ToString());
495498
++*Metrics.StorageError;
496499
PendingCheckpoints.erase(it);
500+
FailedZeroCheckpoint = InitingZeroCheckpoint;
497501
return;
498502
}
499503

@@ -571,6 +575,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvAbortCheckpoi
571575
++*Metrics.Aborted;
572576
}
573577
PendingCheckpoints.erase(checkpointId);
578+
FailedZeroCheckpoint = InitingZeroCheckpoint;
574579
PendingCommitCheckpoints.erase(checkpointId);
575580
UpdateInProgressMetric();
576581
}
@@ -616,6 +621,8 @@ void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
616621
}
617622

618623
void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&) {
624+
Y_DEBUG_ABORT_UNLESS(InitingZeroCheckpoint);
625+
Y_DEBUG_ABORT_UNLESS(!FailedZeroCheckpoint);
619626
InitingZeroCheckpoint = false;
620627
// TODO: run graph only now, not before zero checkpoint inited
621628
}

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoord
193193
std::unique_ptr<TPendingInitCoordinator> PendingInit;
194194
bool GraphIsRunning = false;
195195
bool InitingZeroCheckpoint = false;
196+
bool FailedZeroCheckpoint = false;
196197
bool RestoringFromForeignCheckpoint = false;
197198

198199
TCheckpointCoordinatorMetrics Metrics;

ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,6 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
411411
TEvCheckpointStorage::TEvCompleteCheckpointRequest(CoordinatorId, checkpointId, 300, type));
412412

413413
MockCompleteCheckpointResponse(checkpointId);
414-
MockRunGraph();
415414
}
416415

417416
void SaveFailed(TCheckpointId checkpointId) {
@@ -423,7 +422,6 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
423422
ExpectEvent(StorageProxy,
424423
TEvCheckpointStorage::TEvAbortCheckpointRequest( CoordinatorId, checkpointId, "Can't save node state"));
425424
MockAbortCheckpointResponse(checkpointId);
426-
MockRunGraph();
427425
}
428426

429427
void ScheduleCheckpointing() {
@@ -436,20 +434,23 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
436434
test.RegisterCoordinator();
437435
test.InjectCheckpoint(test.CheckpointId1);
438436
test.AllSavedAndCommited(test.CheckpointId1);
437+
test.MockRunGraph();
439438
}
440439

441440
Y_UNIT_TEST(ShouldTriggerCheckpointWithSourcesAndWithChannel) {
442441
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask, 0);
443442
test.RegisterCoordinator();
444443
test.InjectCheckpoint(test.CheckpointId1);
445444
test.AllSavedAndCommited(test.CheckpointId1);
445+
test.MockRunGraph();
446446
}
447447

448448
Y_UNIT_TEST(ShouldAllSnapshots) {
449449
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0);
450450
test.RegisterCoordinator();
451451
test.InjectCheckpoint(test.CheckpointId1);
452452
test.AllSavedAndCommited(test.CheckpointId1);
453+
test.MockRunGraph();
453454

454455
test.ScheduleCheckpointing();
455456
test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_SNAPSHOT);
@@ -461,6 +462,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
461462
test.RegisterCoordinator();
462463
test.InjectCheckpoint(test.CheckpointId1);
463464
test.AllSavedAndCommited(test.CheckpointId1);
465+
test.MockRunGraph();
464466

465467
test.ScheduleCheckpointing();
466468
test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_INCREMENT_OR_SNAPSHOT);

0 commit comments

Comments
 (0)