From 41cfc189c7216473f4454fbe82012bb8dda52de5 Mon Sep 17 00:00:00 2001 From: tom lee Date: Thu, 24 Mar 2022 11:56:09 +0800 Subject: [PATCH] HDFS-16446. Consider ioutils of disk when choosing volume Co-authored-by: liuhongtong Co-authored-by: hfutatzhanghb <1036798979@qq.com> --- .../apache/hadoop/io/nativeio/NativeIO.java | 2 + .../org/apache/hadoop/io/nativeio/NativeIO.c | 89 +++++- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 + .../hadoop/hdfs/server/datanode/DataNode.java | 22 ++ .../server/datanode/DiskIOUtilManager.java | 278 ++++++++++++++++++ .../AvailableSpaceVolumeChoosingPolicy.java | 179 ++++++++++- .../datanode/fsdataset/FsDatasetSpi.java | 8 + .../datanode/fsdataset/FsVolumeSpi.java | 8 + .../fsdataset/impl/FsDatasetImpl.java | 5 + .../datanode/fsdataset/impl/FsVolumeImpl.java | 5 + .../src/main/resources/hdfs-default.xml | 19 ++ .../server/datanode/SimulatedFSDataset.java | 10 + .../server/datanode/TestDirectoryScanner.java | 5 + .../extdataset/ExternalDatasetImpl.java | 5 + .../extdataset/ExternalVolumeImpl.java | 5 + 15 files changed, 640 insertions(+), 8 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskIOUtilManager.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index ebe7f213ceeb1..0aeec5547b97b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -1132,4 +1132,6 @@ public static void copyFileUnbuffered(File src, File dst) throws IOException { private static native void copyFileUnbuffered0(String src, String dst) throws NativeIOException; + + public static native String getDiskName(String path) throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c index 1d7c508d85c76..7560375b4b2c2 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c @@ -20,6 +20,11 @@ #include "org_apache_hadoop_io_nativeio_NativeIO.h" #include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h" #include "exception.h" +/* + * Throw a java.IO.IOException, generating the message from errno. + * NB. this is also used form windows_secure_container_executor.c + */ +extern void throw_ioe(JNIEnv* env, int errnum); #ifdef UNIX #include @@ -47,6 +52,84 @@ #include #include #include "config.h" +#include +#include +#include +#include +#include +#include +#include +#define MAX_NAME_LEN 128 +#define likely(x) __builtin_expect(!!(x), 1) +#define unlikely(x) __builtin_expect(!!(x), 0) + +void read_diskstats_stat_work(JNIEnv *env, char *diskstats, unsigned int pmajor, unsigned int pminor, char *diskName) +{ + FILE *fp; + char line[256], dev_name[MAX_NAME_LEN]; + int i; + unsigned int ios_pgr, tot_ticks, rq_ticks, wr_ticks, dc_ticks, fl_ticks; + unsigned long rd_ios, rd_merges_or_rd_sec, rd_ticks_or_wr_sec, wr_ios; + unsigned long wr_merges, rd_sec_or_wr_ios, wr_sec; + unsigned long dc_ios, dc_merges, dc_sec, fl_ios; + unsigned int major, minor; + + if ((fp = fopen(diskstats, "r")) == NULL) { + throw_ioe(env, errno); + goto cleanup; + } + + while (fgets(line, sizeof(line), fp) != NULL) { + /* major minor name rio rmerge rsect ruse wio wmerge wsect wuse running use aveq dcio dcmerge dcsect dcuse flio fltm */ + i = sscanf(line, "%u %u %s %lu %lu %lu %lu %lu %lu %lu %u %u %u %u %lu %lu %lu %u %lu %u", + &major, &minor, dev_name, + &rd_ios, &rd_merges_or_rd_sec, &rd_sec_or_wr_ios, &rd_ticks_or_wr_sec, + &wr_ios, &wr_merges, &wr_sec, &wr_ticks, &ios_pgr, &tot_ticks, &rq_ticks, + &dc_ios, &dc_merges, &dc_sec, &dc_ticks, + &fl_ios, &fl_ticks); + if (pmajor == major && pminor == minor) { + strncpy(diskName, dev_name, MAX_NAME_LEN); + fclose(fp); + return; + } + } + if(fclose(fp) != 0) { + throw_ioe(env, errno); + goto cleanup; + } + +cleanup: + return; +} + +JNIEXPORT jstring JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_getDiskName + (JNIEnv *env, jclass class, jstring j_str) { + struct stat buf; + const char *c_str = NULL; + char diskName[MAX_NAME_LEN] = {0}; + jboolean isCopy; + c_str = (*env)->GetStringUTFChars(env, j_str, &isCopy); + // judge whether this str parameter is null or not should in java code. + if(c_str == NULL) { + return NULL; + } + if (unlikely(stat(c_str, &buf) == -1)) { + throw_ioe(env, errno); + goto cleanup; + } else { + unsigned int major = MAJOR(buf.st_dev); + unsigned int minor = MINOR(buf.st_dev); + read_diskstats_stat_work(env, "/proc/diskstats", major, minor, diskName); + (*env)->ReleaseStringUTFChars(env, j_str, c_str); + return (*env)->NewStringUTF(env, diskName); + } + +cleanup: + if (c_str != NULL) { + (*env)->ReleaseStringUTFChars(env, j_str, c_str); + } + return NULL; +} #endif #ifdef WINDOWS @@ -92,12 +175,6 @@ static jclass pmem_region_clazz = NULL; static jmethodID pmem_region_ctor = NULL; #endif -/* - * Throw a java.IO.IOException, generating the message from errno. - * NB. this is also used form windows_secure_container_executor.c - */ -extern void throw_ioe(JNIEnv* env, int errnum); - // Internal functions #ifdef UNIX static ssize_t get_pw_buflen(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 6216f6e7a1ded..94358834a73e5 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -2007,5 +2007,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_LEASE_HARDLIMIT_DEFAULT = HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT; + public static final String DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY = + "dfs.datanode.disk.stat.interval.seconds"; + public static final long DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT = 1L; + public static final String + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_KEY = + "dfs.datanode.available-space-volume-choosing-policy.io.util.preference.enable"; + public static final boolean + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_DEFAULT = false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c3b1aa1c67203..12f16d6f77897 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -443,6 +443,7 @@ public static InetSocketAddress createSocketAddr(String target) { private static final double CONGESTION_RATIO = 1.5; private DiskBalancer diskBalancer; private DataSetLockManager dataSetLockManager; + private DiskIOUtilManager diskIOUtilManager; private final ExecutorService xferService; @@ -490,6 +491,11 @@ private static Tracer createTracer(Configuration conf) { volumeChecker = new DatasetVolumeChecker(conf, new Timer()); this.xferService = HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); + if (Shell.LINUX) { + this.diskIOUtilManager = new DiskIOUtilManager(conf); + } else { + LOG.info("Disk io util manager does not start, only Linux OS release support!"); + } } /** @@ -1190,6 +1196,9 @@ public IOException call() { conf.set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(effectiveVolumes)); dataDirs = getStorageLocations(conf); + if (diskIOUtilManager != null) { + diskIOUtilManager.setStorageLocations(dataDirs); + } } } } @@ -1709,6 +1718,10 @@ void startDataNode(List dataDirectories, synchronized (this) { this.dataDirs = dataDirectories; } + if (diskIOUtilManager != null) { + this.diskIOUtilManager.setStorageLocations(dataDirectories); + this.diskIOUtilManager.start(); + } this.dnConf = new DNConf(this); checkSecureConfig(dnConf, getConf(), resources); @@ -2505,6 +2518,10 @@ public void shutdown() { dataNodeInfoBeanName = null; } if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown(); + if (diskIOUtilManager != null) { + diskIOUtilManager.stop(); + diskIOUtilManager = null; + } LOG.info("Shutdown complete."); synchronized(this) { // it is already false, but setting it again to avoid a findbug warning. @@ -4158,4 +4175,9 @@ boolean isSlownode() { public BlockPoolManager getBlockPoolManager() { return blockPoolManager; } + + public int getStorageLocationDiskUtil(StorageLocation location) { + return diskIOUtilManager != null ? + diskIOUtilManager.getStorageLocationDiskIOUtil(location) : 0; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskIOUtilManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskIOUtilManager.java new file mode 100644 index 0000000000000..fc231310af225 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskIOUtilManager.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.hadoop.util.Shell; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY; + +public class DiskIOUtilManager implements Runnable { + public static final Logger LOG = + LoggerFactory.getLogger(DiskIOUtilManager.class); + private volatile long intervalMs; + private volatile boolean shouldStop = false; + private Thread diskStatThread; + private Map locationToDisks = new HashMap<>(); + private Map ioStats = new HashMap<>(); + + private static class DiskLocation { + private final String diskName; + private final StorageLocation location; + DiskLocation(StorageLocation location) throws IOException { + this.location = location; + FileStore fs = Files.getFileStore(Paths.get(location.getUri().getPath())); + Path path = Paths.get(fs.name()); + if (path == null) { + throw new IOException("Storage location is invalid, path is null"); + } + Path diskName = path.getFileName(); + if (diskName == null) { + throw new IOException("Storage location is invalid, diskName is null"); + } + String diskNamePlace = null; + if (NativeCodeLoader.isNativeCodeLoaded() && Shell.LINUX) { + try { + diskNamePlace = NativeIO.getDiskName(location.getUri().getPath()); + LOG.info("location is {}, disk name is {}", location.getUri().getPath(), diskNamePlace); + } catch (IOException e) { + LOG.error("Get disk name by NativeIO failed.", e); + diskNamePlace = diskName.toString(); + } finally { + this.diskName = diskNamePlace; + } + } else { + this.diskName = diskName.toString(); + } + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof DiskLocation)) { + return false; + } + DiskLocation o = (DiskLocation) other; + return diskName.equals(o.diskName) && location.equals(o.location); + } + + @Override + public int hashCode() { + return location.hashCode(); + } + + @Override + public String toString() { + return location.toString() + " disk: " + diskName; + } + } + + private static class IOStat { + private long lastTotalTicks; + private int util; + IOStat(long lastTotalTicks) { + this.lastTotalTicks = lastTotalTicks; + } + + public int getUtil() { + return util; + } + + public void setUtil(int util) { + if (util <= 100 && util >= 0) { + this.util = util; + } else if (util < 0) { + this.util = 0; + } else { + this.util = 100; + } + } + + public long getLastTotalTicks() { + return lastTotalTicks; + } + + public void setLastTotalTicks(long lastTotalTicks) { + this.lastTotalTicks = lastTotalTicks; + } + } + + DiskIOUtilManager(Configuration conf) { + this.intervalMs = TimeUnit.SECONDS.toMillis(conf.getLong( + DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY, + DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT)); + if (this.intervalMs < DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT) { + this.intervalMs = 1; + } + } + + @Override + public void run() { + FsVolumeImpl.LOG.info(this + " starting disk util stat."); + while (true) { + try { + if (shouldStop) { + FsVolumeImpl.LOG.info(this + " stopping disk util stat."); + break; + } + if (!Shell.LINUX) { + FsVolumeImpl.LOG.debug("Not support disk util stat on this os release."); + continue; + } + Map allIOStats = getDiskIoUtils(); + synchronized (this) { + for (Map.Entry entry : ioStats.entrySet()) { + String disk = entry.getKey().diskName; + IOStat oldStat = entry.getValue(); + int util = 0; + if (allIOStats.containsKey(disk)) { + long oldTotalTicks = oldStat.getLastTotalTicks(); + long newTotalTicks = allIOStats.get(disk).getLastTotalTicks(); + if (oldTotalTicks != 0) { + util = (int) ((double) (newTotalTicks - oldTotalTicks) * 100 / intervalMs); + } + oldStat.setLastTotalTicks(newTotalTicks); + oldStat.setUtil(util); + LOG.debug(disk + " disk io util:" + util); + } else { + //Maybe this disk has been umounted. + oldStat.setUtil(100); + } + } + } + try { + Thread.sleep(intervalMs); + } catch (InterruptedException e) { + LOG.debug("Thread interrupted while sleep in DiskIOUtilManager", e); + } + } catch (Throwable e) { + LOG.error("DiskIOUtilManager encountered an exception.", e); + } + } + } + + void start() { + if (diskStatThread != null) { + return; + } + shouldStop = false; + diskStatThread = new Thread(this, threadName()); + diskStatThread.setDaemon(true); + diskStatThread.start(); + } + + void stop() { + shouldStop = true; + if (diskStatThread != null) { + diskStatThread.interrupt(); + try { + diskStatThread.join(); + } catch (InterruptedException e) { + LOG.debug("Thread interrupted while stop DiskIOUtilManager", e); + } + } + } + + private String threadName() { + return "DataNode disk io util manager"; + } + + private static final String PROC_DISKSSTATS = "/proc/diskstats"; + private static final Pattern DISK_STAT_FORMAT = + Pattern.compile("[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*(\\S*)" + + "[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*[0-9]*" + + "[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*[0-9]*" + + "[ \t]*[0-9]*[ \t]*[0-9]*[ \t]*[0-9]*" + + "[ \t]*([0-9]*)[ \t].*"); + + private Map getDiskIoUtils() { + Map rets = new HashMap<>(); + try(InputStreamReader fReader = new InputStreamReader( + new FileInputStream(PROC_DISKSSTATS), Charset.forName("UTF-8")); + BufferedReader in = new BufferedReader(fReader)) { + Matcher mat = null; + String str = in.readLine(); + while (str != null) { + mat = DISK_STAT_FORMAT.matcher(str); + if (mat.find()) { + String disk = mat.group(1); + long totalTicks = Long.parseLong(mat.group(2)); + LOG.debug(str + " totalTicks:" + totalTicks); + IOStat stat = new IOStat(totalTicks); + rets.put(disk, stat); + } + str = in.readLine(); + } + } catch (FileNotFoundException f) { + // Shouldn't happen. + return rets; + } catch (IOException e) { + LOG.warn("Get disk ioUtils failed.", e); + } + return rets; + } + + public synchronized void setStorageLocations(List locations) throws IOException { + if (locations == null) { + return; + } + locationToDisks.clear(); + ioStats.clear(); + for (StorageLocation location : locations) { + DiskLocation diskLocation = new DiskLocation(location); + locationToDisks.put(location, diskLocation); + IOStat stat = new IOStat(0); + ioStats.put(diskLocation, stat); + } + } + + public synchronized int getStorageLocationDiskIOUtil(StorageLocation location) { + DiskLocation diskLocation = locationToDisks.get(location); + if (diskLocation == null) { + return 0; + } + if (ioStats.containsKey(diskLocation)) { + return ioStats.get(diskLocation).getUtil(); + } else { + return 0; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java index 5d12fa72bb165..8175204251762 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java @@ -21,10 +21,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_KEY; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Random; import org.slf4j.Logger; @@ -55,6 +59,8 @@ public class AvailableSpaceVolumeChoosingPolicy private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT; private float balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT; + private boolean ioUtilPreferenceEnable = + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_DEFAULT; AvailableSpaceVolumeChoosingPolicy(Random random) { this.random = random; @@ -81,12 +87,17 @@ public void setConf(Configuration conf) { balancedPreferencePercent = conf.getFloat( DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY, DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT); + ioUtilPreferenceEnable = conf.getBoolean( + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_KEY, + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_DEFAULT); LOG.info("Available space volume choosing policy initialized: " + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY + " = " + balancedSpaceThreshold + ", " + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY + - " = " + balancedPreferencePercent); + " = " + balancedPreferencePercent + ", " + + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_KEY + + " = " + ioUtilPreferenceEnable); if (balancedPreferencePercent > 1.0) { LOG.warn("The value of " + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY + @@ -135,9 +146,22 @@ private V doChooseVolume(final List volumes, long replicaSize, new AvailableSpaceVolumeList(volumes); if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) { + V volume; + if (ioUtilPreferenceEnable) { + IOUtilVolumeList volumesWithIOUtils = new IOUtilVolumeList(volumes); + volume = volumesWithIOUtils.getFirstVolumesWithLowIOUtilAndHighAvailableSpace(replicaSize); + if (volume != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("All volumes are within the configured free space balance " + + "threshold and prefer low io util. Selecting " + volume + + " for write of block size " + replicaSize); + } + return volume; + } + } // If they're actually not too far out of whack, fall back on pure round // robin. - V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize, + volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize, storageId); if (LOG.isDebugEnabled()) { LOG.debug("All volumes are within the configured free space balance " + @@ -165,6 +189,21 @@ private V doChooseVolume(final List volumes, long replicaSize, preferencePercentScaler; if (mostAvailableAmongLowVolumes < replicaSize || random.nextFloat() < scaledPreferencePercent) { + if (ioUtilPreferenceEnable) { + IOUtilVolumeList volumesWithIOUtils = new IOUtilVolumeList(highAvailableVolumes); + volume = + volumesWithIOUtils.getFirstVolumesWithLowIOUtilAndHighAvailableSpace(replicaSize); + if (volume != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Volumes are imbalanced. Selecting " + volume + + " from high available space and low io util volumes for write of block size " + + replicaSize); + } + return volume; + } else { + volumesWithIOUtils.chooseVolumeByIOUtilFailed(replicaSize); + } + } volume = roundRobinPolicyHighAvailable.chooseVolume( highAvailableVolumes, replicaSize, storageId); if (LOG.isDebugEnabled()) { @@ -173,6 +212,19 @@ private V doChooseVolume(final List volumes, long replicaSize, + replicaSize); } } else { + if (ioUtilPreferenceEnable) { + IOUtilVolumeList volumesWithIOUtils = new IOUtilVolumeList(lowAvailableVolumes); + volume = + volumesWithIOUtils.getFirstVolumesWithLowIOUtilAndHighAvailableSpace(replicaSize); + if (volume != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Volumes are imbalanced. Selecting " + volume + + " from low available space and low io util volumes for write of block size " + + replicaSize); + } + return volume; + } + } volume = roundRobinPolicyLowAvailable.chooseVolume( lowAvailableVolumes, replicaSize, storageId); if (LOG.isDebugEnabled()) { @@ -296,4 +348,127 @@ private List extractVolumesFromPairs(List volumes) return ret; } + /** + * Used to keep track of the list of volumes we're choosing from. + */ + private class IOUtilVolumeList { + private final List volumes; + private long avgAvailableSpace; + + IOUtilVolumeList(List volumes) throws IOException { + long allAvailableSpace = 0; + this.volumes = new ArrayList(); + for (V volume : volumes) { + this.volumes.add(new IOUtilVolumePair(volume)); + allAvailableSpace += volume.getAvailable(); + } + this.avgAvailableSpace = allAvailableSpace / volumes.size(); + Collections.sort(this.volumes); + if (LOG.isDebugEnabled()) { + for (IOUtilVolumePair pair : this.volumes) { + LOG.debug(String.valueOf(pair)); + } + } + } + + public V getFirstVolumesWithLowIOUtilAndHighAvailableSpace(long replicaSize) { + // Volumes has sorted by io util + for (IOUtilVolumePair pair : volumes) { + //Available space is not enough to store the new replica. + if (pair.getAvailableSpace() < replicaSize) { + continue; + } + //Volume's available space is too low. + if (pair.getAvailableSpace() < balancedSpaceThreshold) { + continue; + } else { + // got the first volume with low io util and high available space + if (LOG.isDebugEnabled()) { + LOG.debug("choose volumeļ¼š" + pair); + } + return pair.getVolume(); + } + } + return null; + } + + void chooseVolumeByIOUtilFailed(long replicaSize) { + StringBuilder str = new StringBuilder(); + str.append("Choose volume failed, " + + "replica size:" + replicaSize + + " average available space:" + avgAvailableSpace + " volumes:"); + str.append(System.lineSeparator()); + for (IOUtilVolumePair pair: volumes) { + str.append(pair.toString()); + str.append(System.lineSeparator()); + } + LOG.warn(str.toString()); + } + } + + /** + * Used so that we only check the available space on a given volume once, at + * the beginning of + * {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume}. + */ + private class IOUtilVolumePair implements Comparable{ + private final V volume; + private final int ioUtil; + private final long availableSpace; + + IOUtilVolumePair(V volume) throws IOException { + this.volume = volume; + this.ioUtil = volume.getVolumeIOUtil(); + this.availableSpace = volume.getAvailable(); + } + + public V getVolume() { + return volume; + } + + public int getIoUtil() { + return ioUtil; + } + + public long getAvailableSpace() { + return availableSpace; + } + + @Override + public String toString() { + return volume.toString() + " io util:" + ioUtil + " available space:" + availableSpace; + } + + @Override + public int compareTo(IOUtilVolumePair o) { + if (this.ioUtil != o.ioUtil) { + return this.ioUtil - o.ioUtil; + } else { + if (this.availableSpace < o.availableSpace) { + return 1; + } else if (this.availableSpace == o.availableSpace) { + return 0; + } else { + return -1; + } + } + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof AvailableSpaceVolumeChoosingPolicy.IOUtilVolumePair)) { + return false; + } + IOUtilVolumePair o = (IOUtilVolumePair) other; + return this.volume.equals(o.getVolume()) && + this.ioUtil == o.getIoUtil() && + this.availableSpace == o.getAvailableSpace(); + } + + @Override + public int hashCode() { + return Objects.hash(this.volume, this.ioUtil, this.availableSpace); + } + + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 8d1d10bccd2fc..6bbf6888b3849 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -685,4 +685,12 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, * Get the volume list. */ List getVolumeList(); + + /** + * Get the utility of storage location. + * + * @param location + * @return The utility of storage location. + */ + int getStorageLocationDiskUtil(StorageLocation location); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index 8ae204364f05a..2511dd2da9fc7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -448,4 +448,12 @@ class VolumeCheckContext { FileIoProvider getFileIoProvider(); DataNodeVolumeMetrics getMetrics(); + + /** + * Get the utility of disk. The value is between 0 and 100, + * 0 means the volume is idle, 100 means the volume is very busy. + * + * @return The utility of disk. + */ + int getVolumeIOUtil(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index aaf37aa09c8c9..1d5f21f152e00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -223,6 +223,11 @@ public Set deepCopyReplica(String bpid) } } + @Override + public int getStorageLocationDiskUtil(StorageLocation location) { + return datanode.getStorageLocationDiskUtil(location); + } + /** * This should be primarily used for testing. * @return clone of replica store in datanode memory diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 806afbdb2d115..7e667ce19d522 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -1409,6 +1409,11 @@ public DataNodeVolumeMetrics getMetrics() { return metrics; } + @Override + public int getVolumeIOUtil() { + return dataset.getStorageLocationDiskUtil(storageLocation); + } + /** * Filter for block file names stored on the file system volumes. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d45f8eb5b7ec5..42445f3f2dd93 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2704,6 +2704,25 @@ + + dfs.datanode.available-space-volume-choosing-policy.io.util.preference.enable + false + + Only used when the dfs.datanode.fsdataset.volume.choosing.policy is set to + org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy. + If set to true, busy disks will be filtered as much as possible when data is written. + The default is false. + + + + + dfs.datanode.disk.stat.interval.seconds + 1 + + How often does DiskIOUtilManager collect ioutil metrics. + + + dfs.datanode.round-robin-volume-choosing-policy.additional-available-space 1073741824 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 29eb051cb0210..bc8d0fa8c7619 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -688,6 +688,11 @@ public DataNodeVolumeMetrics getMetrics() { return metrics; } + @Override + public int getVolumeIOUtil() { + return 0; + } + @Override public VolumeCheckResult check(VolumeCheckContext context) throws Exception { @@ -1611,5 +1616,10 @@ public MountVolumeMap getMountVolumeMap() { public List getVolumeList() { return null; } + + @Override + public int getStorageLocationDiskUtil(StorageLocation location) { + return 0; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 74c70cec76967..441cbf6a0f731 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -1087,6 +1087,11 @@ public DataNodeVolumeMetrics getMetrics() { return null; } + @Override + public int getVolumeIOUtil() { + return 0; + } + @Override public VolumeCheckResult check(VolumeCheckContext context) throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 77e2e2077d1f5..a281d2184cd31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -473,4 +473,9 @@ public MountVolumeMap getMountVolumeMap() { public List getVolumeList() { return null; } + + @Override + public int getStorageLocationDiskUtil(StorageLocation location) { + return 0; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 6c8e828f3689e..6f82e6c3a0807 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -136,6 +136,11 @@ public DataNodeVolumeMetrics getMetrics() { return null; } + @Override + public int getVolumeIOUtil() { + return 0; + } + @Override public VolumeCheckResult check(VolumeCheckContext context) throws Exception {