Skip to content

Commit fd63060

Browse files
authored
Merge branch 'apache:trunk' into YARN-7707-V2
2 parents 378c25a + 82c8070 commit fd63060

File tree

56 files changed

+2327
-969
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2327
-969
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,12 @@ public void get(int blockNumber, ByteBuffer buffer) throws IOException {
271271
protected int readFile(Path path, ByteBuffer buffer) throws IOException {
272272
int numBytesRead = 0;
273273
int numBytes;
274-
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
275-
while ((numBytes = channel.read(buffer)) > 0) {
276-
numBytesRead += numBytes;
274+
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
275+
while ((numBytes = channel.read(buffer)) > 0) {
276+
numBytesRead += numBytes;
277+
}
278+
buffer.limit(buffer.position());
277279
}
278-
buffer.limit(buffer.position());
279-
channel.close();
280280
return numBytesRead;
281281
}
282282

@@ -460,11 +460,11 @@ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
460460

461461
protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
462462
buffer.rewind();
463-
WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS);
464-
while (buffer.hasRemaining()) {
465-
writeChannel.write(buffer);
463+
try (WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS)) {
464+
while (buffer.hasRemaining()) {
465+
writeChannel.write(buffer);
466+
}
466467
}
467-
writeChannel.close();
468468
}
469469

470470
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/LogExactlyOnce.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,15 @@ public void error(String format, Object...args) {
4949
log.error(format, args);
5050
}
5151
}
52+
53+
/**
54+
* Log at DEBUG if nothing has been logged yet.
55+
* @param format format string
56+
* @param args arguments
57+
*/
58+
public void debug(String format, Object...args) {
59+
if (!logged.getAndSet(true)) {
60+
log.debug(format, args);
61+
}
62+
}
5263
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,20 @@ public static SecureRandom getSecureRandom() {
139139
public static class ServiceComparator implements Comparator<DatanodeInfo> {
140140
@Override
141141
public int compare(DatanodeInfo a, DatanodeInfo b) {
142-
// Decommissioned nodes will still be moved to the end of the list
142+
// Decommissioned nodes will be moved to the end of the list.
143143
if (a.isDecommissioned()) {
144144
return b.isDecommissioned() ? 0 : 1;
145145
} else if (b.isDecommissioned()) {
146146
return -1;
147147
}
148148

149+
// Decommissioning nodes will be placed before decommissioned nodes.
150+
if (a.isDecommissionInProgress()) {
151+
return b.isDecommissionInProgress() ? 0 : 1;
152+
} else if (b.isDecommissionInProgress()) {
153+
return -1;
154+
}
155+
149156
// ENTERING_MAINTENANCE nodes should be after live nodes.
150157
if (a.isEnteringMaintenance()) {
151158
return b.isEnteringMaintenance() ? 0 : 1;
@@ -159,9 +166,9 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {
159166

160167
/**
161168
* Comparator for sorting DataNodeInfo[] based on
162-
* slow, stale, entering_maintenance and decommissioned states.
169+
* slow, stale, entering_maintenance, decommissioning and decommissioned states.
163170
* Order: live {@literal ->} slow {@literal ->} stale {@literal ->}
164-
* entering_maintenance {@literal ->} decommissioned
171+
* entering_maintenance {@literal ->} decommissioning {@literal ->} decommissioned
165172
*/
166173
@InterfaceAudience.Private
167174
public static class StaleAndSlowComparator extends ServiceComparator {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,14 @@ public byte[] getBlockIndices() {
147147
return indices;
148148
}
149149

150+
public byte[] getBlockIndicesForSpecifiedStorages(List<Integer> storageIdx) {
151+
byte[] indices = new byte[storageIdx.size()];
152+
for (int i = 0; i < indices.length; i++) {
153+
indices[i] = BlockIdManager.getBlockIndex(replicas[storageIdx.get(i)]);
154+
}
155+
return indices;
156+
}
157+
150158
public int getNumExpectedLocations() {
151159
return replicas.length;
152160
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,9 @@ public DatanodeStatistics getDatanodeStatistics() {
511511
}
512512

513513
private boolean isInactive(DatanodeInfo datanode) {
514-
return datanode.isDecommissioned() || datanode.isEnteringMaintenance() ||
514+
return datanode.isDecommissioned() ||
515+
datanode.isDecommissionInProgress() ||
516+
datanode.isEnteringMaintenance() ||
515517
(avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
516518
}
517519

@@ -540,7 +542,7 @@ public int getMaxSlowpeerCollectNodes() {
540542
/**
541543
* Sort the non-striped located blocks by the distance to the target host.
542544
*
543-
* For striped blocks, it will only move decommissioned/stale/slow
545+
* For striped blocks, it will only move decommissioned/decommissioning/stale/slow
544546
* nodes to the bottom. For example, assume we have storage list:
545547
* d0, d1, d2, d3, d4, d5, d6, d7, d8, d9
546548
* mapping to block indices:
@@ -570,7 +572,7 @@ public void sortLocatedBlocks(final String targetHost,
570572
}
571573

572574
/**
573-
* Move decommissioned/entering_maintenance/stale/slow
575+
* Move decommissioned/decommissioning/entering_maintenance/stale/slow
574576
* datanodes to the bottom. After sorting it will
575577
* update block indices and block tokens respectively.
576578
*
@@ -588,7 +590,8 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
588590
locToIndex.put(di[i], lsb.getBlockIndices()[i]);
589591
locToToken.put(di[i], lsb.getBlockTokens()[i]);
590592
}
591-
// Move decommissioned/stale datanodes to the bottom
593+
// Arrange the order of datanodes as follows:
594+
// live(in-service) -> stale -> entering_maintenance -> decommissioning -> decommissioned
592595
Arrays.sort(di, comparator);
593596

594597
// must update cache since we modified locations array
@@ -602,7 +605,7 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
602605
}
603606

604607
/**
605-
* Move decommissioned/entering_maintenance/stale/slow
608+
* Move decommissioned/decommissioning/entering_maintenance/stale/slow
606609
* datanodes to the bottom. Also, sort nodes by network
607610
* distance.
608611
*
@@ -634,8 +637,8 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
634637
}
635638

636639
DatanodeInfoWithStorage[] di = lb.getLocations();
637-
// Move decommissioned/entering_maintenance/stale/slow
638-
// datanodes to the bottom
640+
// Arrange the order of datanodes as follows:
641+
// live(in-service) -> stale -> entering_maintenance -> decommissioning -> decommissioned
639642
Arrays.sort(di, comparator);
640643

641644
// Sort nodes by network distance only for located blocks
@@ -1717,9 +1720,11 @@ private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
17171720
// Skip stale nodes during recovery
17181721
final List<DatanodeStorageInfo> recoveryLocations =
17191722
new ArrayList<>(storages.length);
1720-
for (DatanodeStorageInfo storage : storages) {
1721-
if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
1722-
recoveryLocations.add(storage);
1723+
final List<Integer> storageIdx = new ArrayList<>(storages.length);
1724+
for (int i = 0; i < storages.length; ++i) {
1725+
if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
1726+
recoveryLocations.add(storages[i]);
1727+
storageIdx.add(i);
17231728
}
17241729
}
17251730
// If we are performing a truncate recovery than set recovery fields
@@ -1752,7 +1757,8 @@ private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
17521757
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
17531758
uc.getBlockRecoveryId());
17541759
if (b.isStriped()) {
1755-
rBlock = new RecoveringStripedBlock(rBlock, uc.getBlockIndices(),
1760+
rBlock = new RecoveringStripedBlock(rBlock,
1761+
uc.getBlockIndicesForSpecifiedStorages(storageIdx),
17561762
((BlockInfoStriped) b).getErasureCodingPolicy());
17571763
}
17581764
}

hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ Usage:
452452

453453
hdfs dfsrouteradmin
454454
[-add <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
455+
[-addAll <source1> <nameservice1,nameservice2,...> <destination1> [-readonly] [-faulttolerant][-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] -owner <owner1> -group <group1> -mode <mode1> , <source2> <nameservice1,nameservice2,...> <destination2> [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] -owner <owner2> -group <group2> -mode <mode2> , ...]
455456
[-update <source> [<nameservice1, nameservice2, ...> <destination>] [-readonly true|false] [-faulttolerant true|false] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
456457
[-rm <source>]
457458
[-ls [-d] <path>]
@@ -460,6 +461,7 @@ Usage:
460461
[-setStorageTypeQuota <path> -storageType <storage type> <quota in bytes or quota size string>]
461462
[-clrQuota <path>]
462463
[-clrStorageTypeQuota <path>]
464+
[-dumpState]
463465
[-safemode enter | leave | get]
464466
[-nameservice disable | enable <nameservice>]
465467
[-getDisabledNameservices]
@@ -517,7 +519,7 @@ Runs the diskbalancer CLI. See [HDFS Diskbalancer](./HDFSDiskbalancer.html) for
517519
Usage:
518520

519521
hdfs ec [generic options]
520-
[-setPolicy -policy <policyName> -path <path>]
522+
[-setPolicy -policy <policyName> -path <path> [-replicate]]
521523
[-getPolicy -path <path>]
522524
[-unsetPolicy -path <path>]
523525
[-listPolicies]
@@ -718,11 +720,13 @@ Recover the lease on the specified path. The path must reside on an HDFS file sy
718720

719721
### `verifyEC`
720722

721-
Usage: `hdfs debug verifyEC -file <file>`
723+
Usage: `hdfs debug verifyEC -file <file> [-blockId <blk_Id>] [-skipFailureBlocks]`
722724

723725
| COMMAND\_OPTION | Description |
724726
|:---- |:---- |
725727
| [`-file` *EC-file*] | HDFS EC file to be verified. |
728+
| [`-blockId` *blk_Id*] | Specify the blk_Id to verify a block group of the file. |
729+
| [`-skipFailureBlocks`] | Specify will skip any block group failures during verify and continues verify all block groups of the file, the default is not to skip failure blocks . |
726730

727731
Verify the correctness of erasure coding on an erasure coded file.
728732

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker;
3131
import org.apache.hadoop.hdfs.server.datanode.DataNode;
3232
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
33+
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
34+
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
35+
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
36+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
3337
import org.apache.hadoop.io.IOUtils;
3438
import org.apache.hadoop.security.UserGroupInformation;
3539
import org.apache.hadoop.test.GenericTestUtils;
@@ -188,6 +192,62 @@ public void testLeaseRecovery() throws Exception {
188192
}
189193
}
190194

195+
/**
196+
* Test lease recovery for EC policy when one internal block located on
197+
* stale datanode.
198+
*/
199+
@Test
200+
public void testLeaseRecoveryWithStaleDataNode() {
201+
LOG.info("blockLengthsSuite: " +
202+
Arrays.toString(blockLengthsSuite));
203+
long staleInterval = conf.getLong(
204+
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
205+
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
206+
207+
for (int i = 0; i < blockLengthsSuite.length; i++) {
208+
BlockLengths blockLengths = blockLengthsSuite[i];
209+
try {
210+
writePartialBlocks(blockLengths.getBlockLengths());
211+
212+
// Get block info for the last block and mark corresponding datanode
213+
// as stale.
214+
LocatedBlock locatedblock =
215+
TestInterDatanodeProtocol.getLastLocatedBlock(
216+
dfs.dfs.getNamenode(), p.toString());
217+
DatanodeInfo firstDataNode = locatedblock.getLocations()[0];
218+
DatanodeDescriptor dnDes = cluster.getNameNode().getNamesystem()
219+
.getBlockManager().getDatanodeManager()
220+
.getDatanode(firstDataNode);
221+
DataNodeTestUtils.setHeartbeatsDisabledForTests(
222+
cluster.getDataNode(dnDes.getIpcPort()), true);
223+
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));
224+
225+
long[] longArray = new long[blockLengths.getBlockLengths().length - 1];
226+
for (int j = 0; j < longArray.length; ++j) {
227+
longArray[j] = blockLengths.getBlockLengths()[j + 1];
228+
}
229+
int safeLength = (int) StripedBlockUtil.getSafeLength(ecPolicy,
230+
longArray);
231+
int checkDataLength = Math.min(testFileLength, safeLength);
232+
recoverLease();
233+
List<Long> oldGS = new ArrayList<>();
234+
oldGS.add(1001L);
235+
StripedFileTestUtil.checkData(dfs, p, checkDataLength,
236+
new ArrayList<>(), oldGS, blockGroupSize);
237+
238+
DataNodeTestUtils.setHeartbeatsDisabledForTests(
239+
cluster.getDataNode(dnDes.getIpcPort()), false);
240+
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, 0);
241+
242+
} catch (Throwable e) {
243+
String msg = "failed testCase at i=" + i + ", blockLengths="
244+
+ blockLengths + "\n"
245+
+ StringUtils.stringifyException(e);
246+
Assert.fail(msg);
247+
}
248+
}
249+
}
250+
191251
@Test
192252
public void testSafeLength() {
193253
checkSafeLength(0, 0); // Length of: 0

0 commit comments

Comments
 (0)