Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
Expand Down Expand Up @@ -67,7 +65,6 @@
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.security.PrivilegedExceptionAction;
Expand All @@ -92,6 +89,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nullable;
import javax.management.ObjectName;
import javax.net.SocketFactory;

Expand All @@ -103,16 +101,14 @@
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
Expand Down Expand Up @@ -203,14 +199,14 @@
import org.apache.hadoop.tracing.TraceUtils;
import org.apache.hadoop.tracing.TracerConfigurationManager;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.InvalidChecksumSizeException;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.VersionInfo;
import org.apache.htrace.core.Tracer;
import org.eclipse.jetty.util.ajax.JSON;
Expand Down Expand Up @@ -389,6 +385,9 @@ public static InetSocketAddress createSocketAddr(String target) {
private static final double CONGESTION_RATIO = 1.5;
private DiskBalancer diskBalancer;

@Nullable
private final StorageLocationChecker storageLocationChecker;


private final SocketFactory socketFactory;

Expand Down Expand Up @@ -423,6 +422,7 @@ private static Tracer createTracer(Configuration conf) {
ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
initOOBTimeout();
storageLocationChecker = null;
}

/**
Expand All @@ -431,6 +431,7 @@ private static Tracer createTracer(Configuration conf) {
*/
DataNode(final Configuration conf,
final List<StorageLocation> dataDirs,
final StorageLocationChecker storageLocationChecker,
final SecureResources resources) throws IOException {
super(conf);
this.tracer = createTracer(conf);
Expand Down Expand Up @@ -506,6 +507,7 @@ public Map<String, Long> load(String key) throws Exception {
});

initOOBTimeout();
this.storageLocationChecker = storageLocationChecker;
}

@Override // ReconfigurableBase
Expand Down Expand Up @@ -1935,6 +1937,10 @@ public void shutdown() {
}
}

if (storageLocationChecker != null) {
storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
}

if (pauseMonitor != null) {
pauseMonitor.stop();
}
Expand Down Expand Up @@ -2620,21 +2626,6 @@ void join() {
}
}

// Small wrapper around the DiskChecker class that provides means to mock
// DiskChecker static methods and unittest DataNode#getDataDirsFromURIs.
static class DataNodeDiskChecker {
private final FsPermission expectedPermission;

public DataNodeDiskChecker(FsPermission expectedPermission) {
this.expectedPermission = expectedPermission;
}

public void checkDir(LocalFileSystem localFS, Path path)
throws DiskErrorException, IOException {
DiskChecker.checkDir(localFS, path, expectedPermission);
}
}

/**
* Make an instance of DataNode after ensuring that at least one of the
* given data directories (and their parent directories, if necessary)
Expand All @@ -2649,44 +2640,18 @@ public void checkDir(LocalFileSystem localFS, Path path)
*/
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
Configuration conf, SecureResources resources) throws IOException {
LocalFileSystem localFS = FileSystem.getLocal(conf);
FsPermission permission = new FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
DataNodeDiskChecker dataNodeDiskChecker =
new DataNodeDiskChecker(permission);
List<StorageLocation> locations =
checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
List<StorageLocation> locations;
StorageLocationChecker storageLocationChecker =
new StorageLocationChecker(conf, new Timer());
try {
locations = storageLocationChecker.check(conf, dataDirs);
} catch (InterruptedException ie) {
throw new IOException("Failed to instantiate DataNode", ie);
}
DefaultMetricsSystem.initialize("DataNode");

assert locations.size() > 0 : "number of data directories should be > 0";
return new DataNode(conf, locations, resources);
}

// DataNode ctor expects AbstractList instead of List or Collection...
static List<StorageLocation> checkStorageLocations(
Collection<StorageLocation> dataDirs,
LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker)
throws IOException {
ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
StringBuilder invalidDirs = new StringBuilder();
for (StorageLocation location : dataDirs) {
final URI uri = location.getUri();
try {
dataNodeDiskChecker.checkDir(localFS, new Path(uri));
locations.add(location);
} catch (IOException ioe) {
LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
+ location + " : ", ioe);
invalidDirs.append("\"").append(uri.getPath()).append("\" ");
}
}
if (locations.size() == 0) {
throw new IOException("All directories in "
+ DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
+ invalidDirs);
}
return locations;
return new DataNode(conf, locations, storageLocationChecker, resources);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation.CheckContext;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -80,12 +83,19 @@ public class StorageLocationChecker {
*/
private final int maxVolumeFailuresTolerated;

public StorageLocationChecker(Configuration conf, Timer timer) {
public StorageLocationChecker(Configuration conf, Timer timer)
throws DiskErrorException {
maxAllowedTimeForCheckMs = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);

if (maxAllowedTimeForCheckMs <= 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ maxAllowedTimeForCheckMs + " (should be > 0)");
}

expectedPermission = new FsPermission(
conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
Expand All @@ -94,6 +104,12 @@ public StorageLocationChecker(Configuration conf, Timer timer) {
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);

if (maxVolumeFailuresTolerated < 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+ maxVolumeFailuresTolerated + " (should be non-negative)");
}

this.timer = timer;

delegateChecker = new ThrottledAsyncChecker<>(
Expand All @@ -113,6 +129,9 @@ public StorageLocationChecker(Configuration conf, Timer timer) {
* Initiate a check of the supplied storage volumes and return
* a list of failed volumes.
*
* StorageLocations are returned in the same order as the input
* for compatibility with existing unit tests.
*
* @param conf HDFS configuration.
* @param dataDirs list of volumes to check.
* @return returns a list of failed volumes. Returns the empty list if
Expand All @@ -128,7 +147,8 @@ public List<StorageLocation> check(
final Collection<StorageLocation> dataDirs)
throws InterruptedException, IOException {

final ArrayList<StorageLocation> goodLocations = new ArrayList<>();
final HashMap<StorageLocation, Boolean> goodLocations =
new LinkedHashMap<>();
final Set<StorageLocation> failedLocations = new HashSet<>();
final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures =
Maps.newHashMap();
Expand All @@ -137,10 +157,18 @@ public List<StorageLocation> check(

// Start parallel disk check operations on all StorageLocations.
for (StorageLocation location : dataDirs) {
goodLocations.put(location, true);
futures.put(location,
delegateChecker.schedule(location, context));
}

if (maxVolumeFailuresTolerated >= dataDirs.size()) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+ maxVolumeFailuresTolerated + ". Value configured is >= "
+ "to the number of configured volumes (" + dataDirs.size() + ").");
}

final long checkStartTimeMs = timer.monotonicNow();

// Retrieve the results of the disk checks.
Expand All @@ -159,24 +187,24 @@ public List<StorageLocation> check(
entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);
switch (result) {
case HEALTHY:
goodLocations.add(entry.getKey());
break;
case DEGRADED:
LOG.warn("StorageLocation {} appears to be degraded.", location);
break;
case FAILED:
LOG.warn("StorageLocation {} detected as failed.", location);
failedLocations.add(location);
goodLocations.remove(location);
break;
default:
LOG.error("Unexpected health check result {} for StorageLocation {}",
result, location);
goodLocations.add(entry.getKey());
}
} catch (ExecutionException|TimeoutException e) {
LOG.warn("Exception checking StorageLocation " + location,
e.getCause());
failedLocations.add(location);
goodLocations.remove(location);
}
}

Expand All @@ -193,7 +221,7 @@ public List<StorageLocation> check(
+ failedLocations);
}

return goodLocations;
return new ArrayList<>(goodLocations.keySet());
}

public void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));

DataNode dn = new DataNode(conf, locations, null) {
DataNode dn = new DataNode(conf, locations, null, null) {
@Override
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
null, ThreadLocalRandom.current().nextLong() | 1L));

dn = new DataNode(conf, locations, null) {
dn = new DataNode(conf, locations, null, null) {
@Override
DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode.DataNodeDiskChecker;

public class TestDataDirs {

Expand Down Expand Up @@ -96,26 +91,4 @@ public void testDataDirParsing() throws Throwable {
assertThat(locations.get(1).getStorageType(), is(StorageType.DISK));
assertThat(locations.get(1).getUri(), is(dir1.toURI()));
}

@Test(timeout = 30000)
public void testDataDirValidation() throws Throwable {

DataNodeDiskChecker diskChecker = mock(DataNodeDiskChecker.class);
doThrow(new IOException()).doThrow(new IOException()).doNothing()
.when(diskChecker)
.checkDir(any(LocalFileSystem.class), any(Path.class));
LocalFileSystem fs = mock(LocalFileSystem.class);
AbstractList<StorageLocation> locations = new ArrayList<StorageLocation>();

locations.add(StorageLocation.parse("file:/p1/"));
locations.add(StorageLocation.parse("file:/p2/"));
locations.add(StorageLocation.parse("file:/p3/"));

List<StorageLocation> checkedLocations =
DataNode.checkStorageLocations(locations, fs, diskChecker);
assertEquals("number of valid data dirs", 1, checkedLocations.size());
String validDir =
new File(checkedLocations.iterator().next().getUri()).getPath();
assertThat("p3 should be valid", new File("/p3/").getPath(), is(validDir));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testDatanodeUuid() throws Exception {
"hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
ArrayList<StorageLocation> locations = new ArrayList<>();

DataNode dn = new DataNode(conf, locations, null);
DataNode dn = new DataNode(conf, locations, null, null);

//Assert that Node iD is null
String nullString = null;
Expand Down
Loading