36
36
import java .util .TreeSet ;
37
37
import java .util .concurrent .BlockingQueue ;
38
38
import java .util .concurrent .CountDownLatch ;
39
- import java .util .concurrent .LinkedBlockingQueue ;
39
+ import java .util .concurrent .LinkedBlockingDeque ;
40
40
import java .util .concurrent .ThreadLocalRandom ;
41
41
import java .util .concurrent .atomic .AtomicBoolean ;
42
42
import java .util .concurrent .atomic .AtomicLong ;
62
62
import org .apache .hadoop .hdfs .server .protocol .DisallowedDatanodeException ;
63
63
import org .apache .hadoop .hdfs .server .protocol .HeartbeatResponse ;
64
64
import org .apache .hadoop .hdfs .server .protocol .InvalidBlockReportLeaseException ;
65
+ import org .apache .hadoop .hdfs .server .protocol .KeyUpdateCommand ;
65
66
import org .apache .hadoop .hdfs .server .protocol .NamespaceInfo ;
66
67
import org .apache .hadoop .hdfs .server .protocol .SlowDiskReports ;
67
68
import org .apache .hadoop .hdfs .server .protocol .SlowPeerReports ;
@@ -738,7 +739,19 @@ private void offerService() throws Exception {
738
739
if (state == HAServiceState .ACTIVE ) {
739
740
handleRollingUpgradeStatus (resp );
740
741
}
741
- commandProcessingThread .enqueue (resp .getCommands ());
742
+ // Note: effect only when the KeyUpdateCommand was the last
743
+ // or penultimate command in DatanodeCommand[].
744
+ DatanodeCommand [] cmds = resp .getCommands ();
745
+ boolean isContaisHighPriorityCmd = false ;
746
+ if (cmds != null ) {
747
+ int length = cmds .length ;
748
+ for (int iter = length - 1 ; iter >= 0 && iter >= length - 2 ; iter --) {
749
+ isContaisHighPriorityCmd = isContaisHighPriorityCmd ||
750
+ cmds [iter ] instanceof KeyUpdateCommand ;
751
+ }
752
+ }
753
+ commandProcessingThread .enqueue (cmds ,
754
+ isContaisHighPriorityCmd );
742
755
isSlownode = resp .getIsSlownode ();
743
756
}
744
757
}
@@ -1389,7 +1402,7 @@ class CommandProcessingThread extends Thread {
1389
1402
CommandProcessingThread (BPServiceActor actor ) {
1390
1403
super ("Command processor" );
1391
1404
this .actor = actor ;
1392
- this .queue = new LinkedBlockingQueue <>();
1405
+ this .queue = new LinkedBlockingDeque <>();
1393
1406
setDaemon (true );
1394
1407
}
1395
1408
@@ -1468,6 +1481,11 @@ private boolean processCommand(DatanodeCommand[] cmds) {
1468
1481
return true ;
1469
1482
}
1470
1483
1484
+ /**
1485
+ * Used for cacheReport.
1486
+ * @param cmd
1487
+ * @throws InterruptedException
1488
+ */
1471
1489
void enqueue (DatanodeCommand cmd ) throws InterruptedException {
1472
1490
if (cmd == null ) {
1473
1491
return ;
@@ -1476,6 +1494,11 @@ void enqueue(DatanodeCommand cmd) throws InterruptedException {
1476
1494
dn .getMetrics ().incrActorCmdQueueLength (1 );
1477
1495
}
1478
1496
1497
+ /**
1498
+ * Used for full block report.
1499
+ * @param cmds
1500
+ * @throws InterruptedException
1501
+ */
1479
1502
void enqueue (List <DatanodeCommand > cmds ) throws InterruptedException {
1480
1503
if (cmds == null ) {
1481
1504
return ;
@@ -1485,8 +1508,18 @@ void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
1485
1508
dn .getMetrics ().incrActorCmdQueueLength (1 );
1486
1509
}
1487
1510
1488
- void enqueue (DatanodeCommand [] cmds ) throws InterruptedException {
1489
- queue .put (() -> processCommand (cmds ));
1511
+ /**
1512
+ * Used for regular heartbeat.
1513
+ * @param cmds
1514
+ * @throws InterruptedException
1515
+ */
1516
+ void enqueue (DatanodeCommand [] cmds ,
1517
+ boolean containsHighPriorityCmds ) throws InterruptedException {
1518
+ if (containsHighPriorityCmds ) {
1519
+ ((LinkedBlockingDeque <Runnable >) queue ).putFirst (() -> processCommand (cmds ));
1520
+ } else {
1521
+ queue .put (() -> processCommand (cmds ));
1522
+ }
1490
1523
dn .getMetrics ().incrActorCmdQueueLength (1 );
1491
1524
}
1492
1525
}
0 commit comments