Skip to content

Commit c0e5898

Browse files
authored
Merge 0087e52 into e756b37
2 parents e756b37 + 0087e52 commit c0e5898

File tree

3 files changed

+7
-2
lines changed

3 files changed

+7
-2
lines changed

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,11 +372,12 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvScheduleC
372372
CC_LOG_D("Got TEvScheduleCheckpointing");
373373
ScheduleNextCheckpoint();
374374
const auto checkpointsInFly = PendingCheckpoints.size() + PendingCommitCheckpoints.size();
375-
if (checkpointsInFly >= Settings.GetMaxInflight() || InitingZeroCheckpoint) {
375+
if (checkpointsInFly >= Settings.GetMaxInflight() || (InitingZeroCheckpoint && !FailedZeroCheckpoint)) {
376376
CC_LOG_W("Skip schedule checkpoint event since inflight checkpoint limit exceeded: current: " << checkpointsInFly << ", limit: " << Settings.GetMaxInflight());
377377
Metrics.SkippedDueToInFlightLimit->Inc();
378378
return;
379379
}
380+
FailedZeroCheckpoint = false;
380381
Metrics.SkippedDueToInFlightLimit->Set(0);
381382
InitCheckpoint();
382383
}
@@ -389,6 +390,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpo
389390
if (issues) {
390391
CC_LOG_E("[" << checkpointId << "] StorageError: can't create checkpoint: " << issues.ToOneLineString());
391392
PendingCheckpoints.erase(checkpointId);
393+
FailedZeroCheckpoint = InitingZeroCheckpoint;
392394
UpdateInProgressMetric();
393395
++*Metrics.FailedToCreate;
394396
++*Metrics.StorageError;
@@ -470,6 +472,7 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskSt
470472
CC_LOG_E("[" << checkpointId << "] Got all acks for aborted checkpoint, aborting in storage");
471473
CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
472474
Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
475+
FailedZeroCheckpoint = InitingZeroCheckpoint;
473476
} else {
474477
CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'");
475478
Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId, checkpoint.GetStats().StateSize), IEventHandle::FlagTrackDelivery);
@@ -494,6 +497,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvSetCheckpoint
494497
CC_LOG_E("[" << checkpointId << "] StorageError: can't change checkpoint status to 'PendingCommit': " << issues.ToString());
495498
++*Metrics.StorageError;
496499
PendingCheckpoints.erase(it);
500+
FailedZeroCheckpoint = InitingZeroCheckpoint;
497501
return;
498502
}
499503

@@ -571,6 +575,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvAbortCheckpoi
571575
++*Metrics.Aborted;
572576
}
573577
PendingCheckpoints.erase(checkpointId);
578+
FailedZeroCheckpoint = InitingZeroCheckpoint;
574579
PendingCommitCheckpoints.erase(checkpointId);
575580
UpdateInProgressMetric();
576581
}

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoord
193193
std::unique_ptr<TPendingInitCoordinator> PendingInit;
194194
bool GraphIsRunning = false;
195195
bool InitingZeroCheckpoint = false;
196+
bool FailedZeroCheckpoint = false;
196197
bool RestoringFromForeignCheckpoint = false;
197198

198199
TCheckpointCoordinatorMetrics Metrics;

ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,6 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
423423
ExpectEvent(StorageProxy,
424424
TEvCheckpointStorage::TEvAbortCheckpointRequest( CoordinatorId, checkpointId, "Can't save node state"));
425425
MockAbortCheckpointResponse(checkpointId);
426-
MockRunGraph();
427426
}
428427

429428
void ScheduleCheckpointing() {

0 commit comments

Comments
 (0)