Skip to content

Commit 43f2cb5

Browse files
authored
Merge 9276550 into 198242d
2 parents 198242d + 9276550 commit 43f2cb5

File tree

3 files changed

+18
-6
lines changed

3 files changed

+18
-6
lines changed

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -460,18 +460,22 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskSt
460460
if (status == NYql::NDqProto::TEvSaveTaskStateResult::OK) {
461461
checkpoint.Acknowledge(ev->Sender, proto.GetStateSizeBytes());
462462
CC_LOG_D("[" << checkpointId << "] Task state saved, need " << checkpoint.NotYetAcknowledgedCount() << " more acks");
463-
if (checkpoint.GotAllAcknowledges()) {
463+
} else {
464+
checkpoint.Abort(ev->Sender);
465+
CC_LOG_E("[" << checkpointId << "] StorageError: can't save node state, aborting checkpoint");
466+
++*Metrics.StorageError;
467+
}
468+
if (checkpoint.GotAllAcknowledges()) {
469+
if (checkpoint.GetStats().Aborted) {
470+
CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
471+
Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
472+
} else {
464473
CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'");
465474
Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId, checkpoint.GetStats().StateSize), IEventHandle::FlagTrackDelivery);
466475
if (InitingZeroCheckpoint) {
467476
Send(RunActorId, new TEvCheckpointCoordinator::TEvZeroCheckpointDone());
468477
}
469478
}
470-
} else {
471-
CC_LOG_E("[" << checkpointId << "] StorageError: can't save node state, aborting checkpoint");
472-
++*Metrics.StorageError;
473-
CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot.
474-
Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery);
475479
}
476480
}
477481

ydb/core/fq/libs/checkpointing/pending_checkpoint.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ void TPendingCheckpoint::Acknowledge(const NActors::TActorId& actorId, ui64 stat
2020
Stats.StateSize += stateSize;
2121
}
2222

23+
void TPendingCheckpoint::Abort(const NActors::TActorId& actorId) {
24+
Acknowledge(actorId);
25+
Stats.Aborted = true;
26+
}
27+
2328
bool TPendingCheckpoint::GotAllAcknowledges() const {
2429
return NotYetAcknowledged.empty();
2530
}

ydb/core/fq/libs/checkpointing/pending_checkpoint.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace NFq {
1010
struct TPendingCheckpointStats {
1111
const TInstant CreatedAt = TInstant::Now();
1212
ui64 StateSize = 0;
13+
bool Aborted = false;
1314
};
1415

1516
class TPendingCheckpoint {
@@ -27,6 +28,8 @@ class TPendingCheckpoint {
2728

2829
void Acknowledge(const NActors::TActorId& actorId, ui64 stateSize);
2930

31+
void Abort(const NActors::TActorId& actorId);
32+
3033
[[nodiscard]]
3134
bool GotAllAcknowledges() const;
3235

0 commit comments

Comments
 (0)