Skip to content

checkoint coordinator: handle failure on saving zero checkpoint #13917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,12 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvScheduleC
CC_LOG_D("Got TEvScheduleCheckpointing");
ScheduleNextCheckpoint();
const auto checkpointsInFly = PendingCheckpoints.size() + PendingCommitCheckpoints.size();
if (checkpointsInFly >= Settings.GetMaxInflight() || InitingZeroCheckpoint) {
if (checkpointsInFly >= Settings.GetMaxInflight() || (InitingZeroCheckpoint && !FailedZeroCheckpoint)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тогда тесты неправильно написаны, есть тест ShouldAbortPreviousCheckpointsIfNodeStateCantBeSaved.
В нем есть MockRunGraph(), которого видимо не должно быть. А когда он должен приходить непонятно

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Он должен приходить после сохранения ZeroCheckpoint (а в тесте есть только симуляция одного -- первого -- сохранения чекпоинта -- неуспешная); никакого влияния ни на что это не оказывает, но правильнее его убрать

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В принципе, ещё можно добавить ассерты в Handle(EvRunGraph).

    Y_ABORT_UNLESS(InitingZeroCheckpoint);
    Y_ABORT_UNLESS(!FailedZeroCheckpoint);

В норме оно вроде должно работать (TEvZeroCheckpointDone->SetLoadFromCheckpointMode->Ping(SetLoadFrom...Cookie)->EvRunGraph), но в тестах что-то налажано и они с этим падают :-|

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Разобрался с тестами, добавил assert для не-релиза

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Так теперь если ZeroCheckpoint фейлится, то TEvZeroCheckpointDone не отправляется и граф не запускается. А чекпойнты начнут идти. Может лучше граф явно зафейлить.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если я правильно понял TODO, то граф работает сразу. Но раньше несохранившийся zero checkpoint ломал чекпоинты вообще, а теперь первый же сохранившийся просто играет роль zero checkpoint.
Если ошибка транзиентная, и просто повторение поможет -- всё отлично.
Если ошибка не транзиентная, и чекпоинты вообще не сохраняются, никакой разницы с тем, что это начало происходить после zero checkpoint: нужны алерты за failure rate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Как всё запутано) Если граф сразу запускается, тогда InitingZeroCheckpoint означает просто признак необходимости отравить TEvZeroCheckpointDone при первом же успешном чекпойнте. И в условии checkpointsInFly >= Settings.GetMaxInflight() все эти флаги не нужны.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Там ещё отдельная ветка с восстановлением из checkpoint, там тоже выставляется InitingZeroCheckpoint

CC_LOG_W("Skip schedule checkpoint event since inflight checkpoint limit exceeded: current: " << checkpointsInFly << ", limit: " << Settings.GetMaxInflight());
Metrics.SkippedDueToInFlightLimit->Inc();
return;
}
FailedZeroCheckpoint = false;
Metrics.SkippedDueToInFlightLimit->Set(0);
InitCheckpoint();
}
Expand All @@ -389,6 +390,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpo
if (issues) {
CC_LOG_E("[" << checkpointId << "] StorageError: can't create checkpoint: " << issues.ToOneLineString());
PendingCheckpoints.erase(checkpointId);
FailedZeroCheckpoint = InitingZeroCheckpoint;
UpdateInProgressMetric();
++*Metrics.FailedToCreate;
++*Metrics.StorageError;
Expand Down Expand Up @@ -470,6 +472,7 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskSt
CC_LOG_E("[" << checkpointId << "] Got all acks for aborted checkpoint, aborting in storage");
CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
FailedZeroCheckpoint = InitingZeroCheckpoint;
} else {
CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'");
Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId, checkpoint.GetStats().StateSize), IEventHandle::FlagTrackDelivery);
Expand All @@ -494,6 +497,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvSetCheckpoint
CC_LOG_E("[" << checkpointId << "] StorageError: can't change checkpoint status to 'PendingCommit': " << issues.ToString());
++*Metrics.StorageError;
PendingCheckpoints.erase(it);
FailedZeroCheckpoint = InitingZeroCheckpoint;
return;
}

Expand Down Expand Up @@ -571,6 +575,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvAbortCheckpoi
++*Metrics.Aborted;
}
PendingCheckpoints.erase(checkpointId);
FailedZeroCheckpoint = InitingZeroCheckpoint;
PendingCommitCheckpoints.erase(checkpointId);
UpdateInProgressMetric();
}
Expand Down Expand Up @@ -616,6 +621,8 @@ void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
}

void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&) {
Y_DEBUG_ABORT_UNLESS(InitingZeroCheckpoint);
Y_DEBUG_ABORT_UNLESS(!FailedZeroCheckpoint);
InitingZeroCheckpoint = false;
// TODO: run graph only now, not before zero checkpoint inited
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoord
std::unique_ptr<TPendingInitCoordinator> PendingInit;
bool GraphIsRunning = false;
bool InitingZeroCheckpoint = false;
bool FailedZeroCheckpoint = false;
bool RestoringFromForeignCheckpoint = false;

TCheckpointCoordinatorMetrics Metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
TEvCheckpointStorage::TEvCompleteCheckpointRequest(CoordinatorId, checkpointId, 300, type));

MockCompleteCheckpointResponse(checkpointId);
MockRunGraph();
}

void SaveFailed(TCheckpointId checkpointId) {
Expand All @@ -423,7 +422,6 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
ExpectEvent(StorageProxy,
TEvCheckpointStorage::TEvAbortCheckpointRequest( CoordinatorId, checkpointId, "Can't save node state"));
MockAbortCheckpointResponse(checkpointId);
MockRunGraph();
}

void ScheduleCheckpointing() {
Expand All @@ -436,20 +434,23 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();
}

Y_UNIT_TEST(ShouldTriggerCheckpointWithSourcesAndWithChannel) {
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask, 0);
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();
}

Y_UNIT_TEST(ShouldAllSnapshots) {
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, 0);
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();

test.ScheduleCheckpointing();
test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_SNAPSHOT);
Expand All @@ -461,6 +462,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
test.RegisterCoordinator();
test.InjectCheckpoint(test.CheckpointId1);
test.AllSavedAndCommited(test.CheckpointId1);
test.MockRunGraph();

test.ScheduleCheckpointing();
test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_INCREMENT_OR_SNAPSHOT);
Expand Down
Loading