|
32 | 32 | import java.util.ArrayDeque; |
33 | 33 | import java.util.Arrays; |
34 | 34 | import java.util.Queue; |
| 35 | +import java.util.concurrent.Semaphore; |
35 | 36 | import java.util.concurrent.atomic.AtomicLong; |
36 | 37 | import java.util.zip.Checksum; |
37 | 38 |
|
@@ -532,7 +533,7 @@ private boolean shouldVerifyChecksum() { |
532 | 533 | * Receives and processes a packet. It can contain many chunks. |
533 | 534 | * returns the number of data bytes that the packet has. |
534 | 535 | */ |
535 | | - private int receivePacket() throws IOException { |
| 536 | + private int receivePacket(final Semaphore ackSema) throws IOException { |
536 | 537 | // read the next packet |
537 | 538 | packetReceiver.receiveNextPacket(in); |
538 | 539 |
|
@@ -616,6 +617,9 @@ private int receivePacket() throws IOException { |
616 | 617 | handleMirrorOutError(e); |
617 | 618 | } |
618 | 619 | } |
| 620 | + if (ackSema != null) { |
| 621 | + ackSema.release(); |
| 622 | + } |
619 | 623 |
|
620 | 624 | ByteBuffer dataBuf = packetReceiver.getDataSlice(); |
621 | 625 | ByteBuffer checksumBuf = packetReceiver.getChecksumSlice(); |
@@ -984,13 +988,15 @@ void receiveBlock( |
984 | 988 | this.isReplaceBlock = isReplaceBlock; |
985 | 989 |
|
986 | 990 | try { |
| 991 | + Semaphore ackSema = null; |
987 | 992 | if (isClient && !isTransfer) { |
| 993 | + ackSema = new Semaphore(0); |
988 | 994 | responder = new Daemon(datanode.threadGroup, |
989 | | - new PacketResponder(replyOut, mirrIn, downstreams)); |
| 995 | + new PacketResponder(replyOut, mirrIn, downstreams, ackSema)); |
990 | 996 | responder.start(); // start thread to processes responses |
991 | 997 | } |
992 | 998 |
|
993 | | - while (receivePacket() >= 0) { /* Receive until the last packet */ } |
| 999 | + while (receivePacket(ackSema) >= 0) { /* Receive until the last packet */ } |
994 | 1000 |
|
995 | 1001 | // wait for all outstanding packet responses. And then |
996 | 1002 | // indicate responder to gracefully shutdown. |
@@ -1246,16 +1252,20 @@ class PacketResponder implements Runnable, Closeable { |
1246 | 1252 | /** for log and error messages */ |
1247 | 1253 | private final String myString; |
1248 | 1254 | private boolean sending = false; |
| 1255 | + /** for synchronization with BlockReceiver */ |
| 1256 | + private final Semaphore ackSema; |
1249 | 1257 |
|
1250 | 1258 | @Override |
1251 | 1259 | public String toString() { |
1252 | 1260 | return myString; |
1253 | 1261 | } |
1254 | 1262 |
|
1255 | 1263 | PacketResponder(final DataOutputStream upstreamOut, |
1256 | | - final DataInputStream downstreamIn, final DatanodeInfo[] downstreams) { |
| 1264 | + final DataInputStream downstreamIn, final DatanodeInfo[] downstreams, |
| 1265 | + final Semaphore ackSema) { |
1257 | 1266 | this.downstreamIn = downstreamIn; |
1258 | 1267 | this.upstreamOut = upstreamOut; |
| 1268 | + this.ackSema = ackSema; |
1259 | 1269 |
|
1260 | 1270 | this.type = downstreams == null? PacketResponderType.NON_PIPELINE |
1261 | 1271 | : downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE |
@@ -1395,6 +1405,9 @@ public void run() { |
1395 | 1405 | long seqno = PipelineAck.UNKOWN_SEQNO; |
1396 | 1406 | long ackRecvNanoTime = 0; |
1397 | 1407 | try { |
| 1408 | + if (ackSema != null) { |
| 1409 | + ackSema.acquire(); |
| 1410 | + } |
1398 | 1411 | if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { |
1399 | 1412 | DataNodeFaultInjector.get().failPipeline(replicaInfo, mirrorAddr); |
1400 | 1413 | // read an ack from downstream datanode |
|
0 commit comments