Skip to content

Commit ec75229

Browse files
authored
checkpointing vs abort: wait for completion before aborting (#11896)
1 parent a925034 commit ec75229

File tree

4 files changed

+20
-6
lines changed

4 files changed

+20
-6
lines changed

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -460,18 +460,23 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskSt
460460
if (status == NYql::NDqProto::TEvSaveTaskStateResult::OK) {
461461
checkpoint.Acknowledge(ev->Sender, proto.GetStateSizeBytes());
462462
CC_LOG_D("[" << checkpointId << "] Task state saved, need " << checkpoint.NotYetAcknowledgedCount() << " more acks");
463-
if (checkpoint.GotAllAcknowledges()) {
463+
} else {
464+
checkpoint.Abort(ev->Sender);
465+
CC_LOG_E("[" << checkpointId << "] StorageError: can't save node state, aborting checkpoint");
466+
++*Metrics.StorageError;
467+
}
468+
if (checkpoint.GotAllAcknowledges()) {
469+
if (checkpoint.GetStats().Aborted) {
470+
CC_LOG_E("[" << checkpointId << "] Got all acks for aborted checkpoint, aborting in storage");
471+
CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
472+
Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
473+
} else {
464474
CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'");
465475
Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId, checkpoint.GetStats().StateSize), IEventHandle::FlagTrackDelivery);
466476
if (InitingZeroCheckpoint) {
467477
Send(RunActorId, new TEvCheckpointCoordinator::TEvZeroCheckpointDone());
468478
}
469479
}
470-
} else {
471-
CC_LOG_E("[" << checkpointId << "] StorageError: can't save node state, aborting checkpoint");
472-
++*Metrics.StorageError;
473-
CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
474-
Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
475480
}
476481
}
477482

ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId, ui64 stat
2020
Stats.StateSize += stateSize;
2121
}
2222

23+
void TPendingCheckpoint::Abort(const NActors::TActorId& actorId) {
24+
Acknowledge(actorId);
25+
Stats.Aborted = true;
26+
}
27+
2328
bool TPendingCheckpoint::GotAllAcknowledges() const {
2429
return NotYetAcknowledged.empty();
2530
}

ydb/core/fq/libs/checkpointing/pending_checkpoint.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace NFq {
1010
struct TPendingCheckpointStats {
1111
const TInstant CreatedAt = TInstant::Now();
1212
ui64 StateSize = 0;
13+
bool Aborted = false;
1314
};
1415

1516
class TPendingCheckpoint {
@@ -27,6 +28,8 @@ class TPendingCheckpoint {
2728

2829
void Acknowledge(const NActors::TActorId& actorId, ui64 stateSize);
2930

31+
void Abort(const NActors::TActorId& actorId);
32+
3033
[[nodiscard]]
3134
bool GotAllAcknowledges() const;
3235

ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
417417
void SaveFailed(TCheckpointId checkpointId) {
418418
MockNodeStateSavedEvent(checkpointId, IngressActor);
419419
MockNodeStateSaveFailedEvent(checkpointId, EgressActor);
420+
MockNodeStateSaveFailedEvent(checkpointId, MapActor);
420421

421422
Cerr << "Waiting for TEvAbortCheckpointRequest (storage)" << Endl;
422423
ExpectEvent(StorageProxy,

0 commit comments

Comments
 (0)