Skip to content

Commit 636d822

Browse files
HDFS-17811. EC: DFSStripedInputStream supports retrying just like DFSInputStream. (apache#7820). Contributed by hfutatzhanghb.
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
1 parent 3d905f9 commit 636d822

File tree

3 files changed

+105
-24
lines changed

3 files changed

+105
-24
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hdfs;
1919

20+
import java.io.IOException;
2021
import java.util.concurrent.atomic.AtomicLong;
2122

2223
import org.apache.hadoop.classification.VisibleForTesting;
@@ -71,4 +72,6 @@ public void delayWhenRenewLeaseTimeout() {}
7172
public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {}
7273

7374
public void failCreateBlockReader() throws InvalidBlockTokenException {}
75+
76+
public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException {};
7477
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -395,37 +395,53 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
395395
throw new IOException("Stream closed");
396396
}
397397

398+
// Number of bytes already read into buffer.
399+
int result = 0;
398400
int len = strategy.getTargetLength();
399401
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
400402
if (pos < getFileLength()) {
401-
try {
402-
if (pos > blockEnd) {
403-
blockSeekTo(pos);
404-
}
405-
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
406-
synchronized (infoLock) {
407-
if (locatedBlocks.isLastBlockComplete()) {
408-
realLen = (int) Math.min(realLen,
409-
locatedBlocks.getFileLength() - pos);
403+
int retries = 2;
404+
boolean isRetryRead = false;
405+
while (retries > 0) {
406+
try {
407+
if (pos > blockEnd || isRetryRead) {
408+
blockSeekTo(pos);
409+
}
410+
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
411+
synchronized (infoLock) {
412+
if (locatedBlocks.isLastBlockComplete()) {
413+
realLen = (int) Math.min(realLen,
414+
locatedBlocks.getFileLength() - pos);
415+
}
410416
}
411-
}
412417

413-
/** Number of bytes already read into buffer */
414-
int result = 0;
415-
while (result < realLen) {
416-
if (!curStripeRange.include(getOffsetInBlockGroup())) {
417-
readOneStripe(corruptedBlocks);
418+
while (result < realLen) {
419+
if (!curStripeRange.include(getOffsetInBlockGroup())) {
420+
DFSClientFaultInjector.get().failWhenReadWithStrategy(isRetryRead);
421+
readOneStripe(corruptedBlocks);
422+
}
423+
int ret = copyToTargetBuf(strategy, realLen - result);
424+
result += ret;
425+
pos += ret;
426+
len -= ret;
427+
}
428+
return result;
429+
} catch (IOException ioe) {
430+
retries--;
431+
if (retries > 0) {
432+
DFSClient.LOG.info(
433+
"DFSStripedInputStream read meets exception:{}, will retry again.",
434+
ioe.toString());
435+
isRetryRead = true;
436+
} else {
437+
throw ioe;
418438
}
419-
int ret = copyToTargetBuf(strategy, realLen - result);
420-
result += ret;
421-
pos += ret;
439+
} finally {
440+
// Check if need to report block replicas corruption either read
441+
// was successful or ChecksumException occurred.
442+
reportCheckSumFailure(corruptedBlocks, getCurrentBlockLocationsLength(),
443+
true);
422444
}
423-
return result;
424-
} finally {
425-
// Check if need to report block replicas corruption either read
426-
// was successful or ChecksumException occurred.
427-
reportCheckSumFailure(corruptedBlocks, getCurrentBlockLocationsLength(),
428-
true);
429445
}
430446
}
431447
return -1;

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hdfs;
1919

20+
import org.apache.hadoop.fs.FSDataOutputStream;
21+
import org.apache.hadoop.fs.FileStatus;
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -50,6 +52,7 @@
5052
import java.util.Arrays;
5153
import java.util.Collections;
5254
import java.util.List;
55+
import java.util.Random;
5356

5457
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
5558
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -735,4 +738,63 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
735738
assertEquals(rangesExpected, ranges);
736739
}
737740

741+
@Test
742+
public void testStatefulReadRetryWhenMoreThanParityFailOnce() throws Exception {
743+
HdfsConfiguration hdfsConf = new HdfsConfiguration();
744+
String testBaseDir = "/testECRead";
745+
String testfileName = "testfile";
746+
DFSClientFaultInjector old = DFSClientFaultInjector.get();
747+
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
748+
.numDataNodes(9).build()) {
749+
cluster.waitActive();
750+
final DistributedFileSystem dfs = cluster.getFileSystem();
751+
Path dir = new Path(testBaseDir);
752+
assertTrue(dfs.mkdirs(dir));
753+
dfs.enableErasureCodingPolicy("RS-6-3-1024k");
754+
dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
755+
assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName());
756+
757+
int writeBufSize = 30 * 1024 * 1024 + 1;
758+
byte[] writeBuf = new byte[writeBufSize];
759+
try (FSDataOutputStream fsdos = dfs.create(
760+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
761+
Random random = new Random();
762+
random.nextBytes(writeBuf);
763+
fsdos.write(writeBuf, 0, writeBuf.length);
764+
Thread.sleep(1000);
765+
}
766+
FileStatus fileStatus = dfs.getFileStatus(
767+
new Path(testBaseDir + Path.SEPARATOR + testfileName));
768+
assertEquals(writeBufSize, fileStatus.getLen());
769+
770+
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
771+
@Override
772+
public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException {
773+
if (!isRetryRead) {
774+
throw new IOException("Mock more than parity num blocks fail when readOneStripe.");
775+
}
776+
}
777+
});
778+
779+
// We use unaligned buffer size to trigger some corner cases.
780+
byte[] readBuf = new byte[4095];
781+
byte[] totalReadBuf = new byte[writeBufSize]; // Buffer to store all read data
782+
int ret = 0;
783+
int totalReadBytes = 0;
784+
try (FSDataInputStream fsdis = dfs.open(
785+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
786+
while((ret = fsdis.read(readBuf)) > 0) {
787+
System.arraycopy(readBuf, 0, totalReadBuf, totalReadBytes, ret);
788+
totalReadBytes += ret;
789+
}
790+
791+
// Compare the read data with the original writeBuf.
792+
assertEquals(writeBufSize, totalReadBytes, "Total bytes read should match writeBuf size");
793+
assertArrayEquals(writeBuf, totalReadBuf, "Read data should match original write data");
794+
}
795+
assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true));
796+
} finally {
797+
DFSClientFaultInjector.set(old);
798+
}
799+
}
738800
}

0 commit comments

Comments
 (0)