34
34
import org .apache .hadoop .util .Time ;
35
35
import org .apache .ratis .proto .RaftProtos .RaftPeerRole ;
36
36
import org .apache .ratis .protocol .RaftGroupId ;
37
+ import org .apache .ratis .protocol .StateMachineException ;
37
38
import org .apache .ratis .server .RaftServer ;
38
39
import org .apache .ratis .server .impl .RaftServerProxy ;
39
40
import org .apache .ratis .server .protocol .TermIndex ;
83
84
import java .util .concurrent .Semaphore ;
84
85
import java .util .concurrent .TimeUnit ;
85
86
import java .util .concurrent .ExecutionException ;
87
+ import java .util .concurrent .atomic .AtomicBoolean ;
86
88
import java .util .stream .Collectors ;
87
89
import java .util .Set ;
88
90
import java .util .concurrent .ConcurrentSkipListSet ;
@@ -147,6 +149,7 @@ public class ContainerStateMachine extends BaseStateMachine {
147
149
private final Cache <Long , ByteString > stateMachineDataCache ;
148
150
private final boolean isBlockTokenEnabled ;
149
151
private final TokenVerifier tokenVerifier ;
152
+ private final AtomicBoolean isStateMachineHealthy ;
150
153
151
154
private final Semaphore applyTransactionSemaphore ;
152
155
/**
@@ -184,6 +187,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
184
187
ScmConfigKeys .
185
188
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT );
186
189
applyTransactionSemaphore = new Semaphore (maxPendingApplyTransactions );
190
+ isStateMachineHealthy = new AtomicBoolean (true );
187
191
this .executors = new ExecutorService [numContainerOpExecutors ];
188
192
for (int i = 0 ; i < numContainerOpExecutors ; i ++) {
189
193
final int index = i ;
@@ -265,6 +269,14 @@ public void persistContainerSet(OutputStream out) throws IOException {
265
269
public long takeSnapshot () throws IOException {
266
270
TermIndex ti = getLastAppliedTermIndex ();
267
271
long startTime = Time .monotonicNow ();
272
+ if (!isStateMachineHealthy .get ()) {
273
+ String msg =
274
+ "Failed to take snapshot " + " for " + gid + " as the stateMachine"
275
+ + " is unhealthy. The last applied index is at " + ti ;
276
+ StateMachineException sme = new StateMachineException (msg );
277
+ LOG .error (msg );
278
+ throw sme ;
279
+ }
268
280
if (ti != null && ti .getIndex () != RaftLog .INVALID_LOG_INDEX ) {
269
281
final File snapshotFile =
270
282
storage .getSnapshotFile (ti .getTerm (), ti .getIndex ());
@@ -275,12 +287,12 @@ public long takeSnapshot() throws IOException {
275
287
// make sure the snapshot file is synced
276
288
fos .getFD ().sync ();
277
289
} catch (IOException ioe ) {
278
- LOG .info ("{}: Failed to write snapshot at:{} file {}" , gid , ti ,
290
+ LOG .error ("{}: Failed to write snapshot at:{} file {}" , gid , ti ,
279
291
snapshotFile );
280
292
throw ioe ;
281
293
}
282
- LOG .info ("{}: Finished taking a snapshot at:{} file:{} time:{}" ,
283
- gid , ti , snapshotFile , (Time .monotonicNow () - startTime ));
294
+ LOG .info ("{}: Finished taking a snapshot at:{} file:{} time:{}" , gid , ti ,
295
+ snapshotFile , (Time .monotonicNow () - startTime ));
284
296
return ti .getIndex ();
285
297
}
286
298
return -1 ;
@@ -385,17 +397,12 @@ private ContainerCommandResponseProto dispatchCommand(
385
397
return response ;
386
398
}
387
399
388
- private ContainerCommandResponseProto runCommandGetResponse (
400
+ private ContainerCommandResponseProto runCommand (
389
401
ContainerCommandRequestProto requestProto ,
390
402
DispatcherContext context ) {
391
403
return dispatchCommand (requestProto , context );
392
404
}
393
405
394
- private Message runCommand (ContainerCommandRequestProto requestProto ,
395
- DispatcherContext context ) {
396
- return runCommandGetResponse (requestProto , context )::toByteString ;
397
- }
398
-
399
406
private ExecutorService getCommandExecutor (
400
407
ContainerCommandRequestProto requestProto ) {
401
408
int executorId = (int )(requestProto .getContainerID () % executors .length );
@@ -425,7 +432,7 @@ private CompletableFuture<Message> handleWriteChunk(
425
432
// thread.
426
433
CompletableFuture <ContainerCommandResponseProto > writeChunkFuture =
427
434
CompletableFuture .supplyAsync (() ->
428
- runCommandGetResponse (requestProto , context ), chunkExecutor );
435
+ runCommand (requestProto , context ), chunkExecutor );
429
436
430
437
CompletableFuture <Message > raftFuture = new CompletableFuture <>();
431
438
@@ -502,7 +509,8 @@ public CompletableFuture<Message> query(Message request) {
502
509
metrics .incNumQueryStateMachineOps ();
503
510
final ContainerCommandRequestProto requestProto =
504
511
getContainerCommandRequestProto (request .getContent ());
505
- return CompletableFuture .completedFuture (runCommand (requestProto , null ));
512
+ return CompletableFuture
513
+ .completedFuture (runCommand (requestProto , null )::toByteString );
506
514
} catch (IOException e ) {
507
515
metrics .incNumQueryStateMachineFails ();
508
516
return completeExceptionally (e );
@@ -674,30 +682,58 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
674
682
if (cmdType == Type .WriteChunk || cmdType ==Type .PutSmallFile ) {
675
683
builder .setCreateContainerSet (createContainerSet );
676
684
}
685
+ CompletableFuture <Message > applyTransactionFuture =
686
+ new CompletableFuture <>();
677
687
// Ensure the command gets executed in a separate thread than
678
688
// stateMachineUpdater thread which is calling applyTransaction here.
679
- CompletableFuture <Message > future = CompletableFuture
680
- .supplyAsync (() -> runCommand (requestProto , builder .build ()),
689
+ CompletableFuture <ContainerCommandResponseProto > future =
690
+ CompletableFuture .supplyAsync (
691
+ () -> runCommand (requestProto , builder .build ()),
681
692
getCommandExecutor (requestProto ));
682
-
683
- future .thenAccept (m -> {
693
+ future .thenApply (r -> {
684
694
if (trx .getServerRole () == RaftPeerRole .LEADER ) {
685
695
long startTime = (long ) trx .getStateMachineContext ();
686
696
metrics .incPipelineLatency (cmdType ,
687
697
Time .monotonicNowNanos () - startTime );
688
698
}
689
-
690
- final Long previous =
691
- applyTransactionCompletionMap
699
+ if (r .getResult () != ContainerProtos .Result .SUCCESS ) {
700
+ StorageContainerException sce =
701
+ new StorageContainerException (r .getMessage (), r .getResult ());
702
+ LOG .error (
703
+ "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
704
+ + "{} Container Result: {}" , gid , r .getCmdType (), index ,
705
+ r .getMessage (), r .getResult ());
706
+ metrics .incNumApplyTransactionsFails ();
707
+ // Since the applyTransaction now is completed exceptionally,
708
+ // before any further snapshot is taken , the exception will be
709
+ // caught in stateMachineUpdater in Ratis and ratis server will
710
+ // shutdown.
711
+ applyTransactionFuture .completeExceptionally (sce );
712
+ isStateMachineHealthy .compareAndSet (true , false );
713
+ ratisServer .handleApplyTransactionFailure (gid , trx .getServerRole ());
714
+ } else {
715
+ LOG .debug (
716
+ "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
717
+ + "{} Container Result: {}" , gid , r .getCmdType (), index ,
718
+ r .getMessage (), r .getResult ());
719
+ applyTransactionFuture .complete (r ::toByteString );
720
+ if (cmdType == Type .WriteChunk || cmdType == Type .PutSmallFile ) {
721
+ metrics .incNumBytesCommittedCount (
722
+ requestProto .getWriteChunk ().getChunkData ().getLen ());
723
+ }
724
+ // add the entry to the applyTransactionCompletionMap only if the
725
+ // stateMachine is healthy i.e, there has been no applyTransaction
726
+ // failures before.
727
+ if (isStateMachineHealthy .get ()) {
728
+ final Long previous = applyTransactionCompletionMap
692
729
.put (index , trx .getLogEntry ().getTerm ());
693
- Preconditions .checkState (previous == null );
694
- if (cmdType == Type .WriteChunk || cmdType == Type .PutSmallFile ) {
695
- metrics .incNumBytesCommittedCount (
696
- requestProto .getWriteChunk ().getChunkData ().getLen ());
730
+ Preconditions .checkState (previous == null );
731
+ updateLastApplied ();
732
+ }
697
733
}
698
- updateLastApplied () ;
734
+ return applyTransactionFuture ;
699
735
}).whenComplete ((r , t ) -> applyTransactionSemaphore .release ());
700
- return future ;
736
+ return applyTransactionFuture ;
701
737
} catch (IOException | InterruptedException e ) {
702
738
metrics .incNumApplyTransactionsFails ();
703
739
return completeExceptionally (e );
0 commit comments