Skip to content

Commit

Permalink
HDFS-16446. Consider ioutils of disk when choosing volume
Browse files Browse the repository at this point in the history
Co-authored-by: liuhongtong <hongtongliu@126.com>
Co-authored-by: hfutatzhanghb <1036798979@qq.com>
  • Loading branch information
3 people committed Mar 31, 2022
1 parent ab8c360 commit 41cfc18
Show file tree
Hide file tree
Showing 15 changed files with 640 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1132,4 +1132,6 @@ public static void copyFileUnbuffered(File src, File dst) throws IOException {

private static native void copyFileUnbuffered0(String src, String dst)
throws NativeIOException;

public static native String getDiskName(String path) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
#include "org_apache_hadoop_io_nativeio_NativeIO.h"
#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h"
#include "exception.h"
/*
* Throw a java.IO.IOException, generating the message from errno.
* NB. this is also used form windows_secure_container_executor.c
*/
extern void throw_ioe(JNIEnv* env, int errnum);

#ifdef UNIX
#include <assert.h>
Expand Down Expand Up @@ -47,6 +52,84 @@
#include <sys/types.h>
#include <unistd.h>
#include "config.h"
#include <signal.h>
#include <linux/kdev_t.h>
#include <string.h>
#include <time.h>
#include <ctype.h>
#include <dirent.h>
#include <sys/utsname.h>
#define MAX_NAME_LEN 128
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)

void read_diskstats_stat_work(JNIEnv *env, char *diskstats, unsigned int pmajor, unsigned int pminor, char *diskName)
{
FILE *fp;
char line[256], dev_name[MAX_NAME_LEN];
int i;
unsigned int ios_pgr, tot_ticks, rq_ticks, wr_ticks, dc_ticks, fl_ticks;
unsigned long rd_ios, rd_merges_or_rd_sec, rd_ticks_or_wr_sec, wr_ios;
unsigned long wr_merges, rd_sec_or_wr_ios, wr_sec;
unsigned long dc_ios, dc_merges, dc_sec, fl_ios;
unsigned int major, minor;

if ((fp = fopen(diskstats, "r")) == NULL) {
throw_ioe(env, errno);
goto cleanup;
}

while (fgets(line, sizeof(line), fp) != NULL) {
/* major minor name rio rmerge rsect ruse wio wmerge wsect wuse running use aveq dcio dcmerge dcsect dcuse flio fltm */
i = sscanf(line, "%u %u %s %lu %lu %lu %lu %lu %lu %lu %u %u %u %u %lu %lu %lu %u %lu %u",
&major, &minor, dev_name,
&rd_ios, &rd_merges_or_rd_sec, &rd_sec_or_wr_ios, &rd_ticks_or_wr_sec,
&wr_ios, &wr_merges, &wr_sec, &wr_ticks, &ios_pgr, &tot_ticks, &rq_ticks,
&dc_ios, &dc_merges, &dc_sec, &dc_ticks,
&fl_ios, &fl_ticks);
if (pmajor == major && pminor == minor) {
strncpy(diskName, dev_name, MAX_NAME_LEN);
fclose(fp);
return;
}
}
if(fclose(fp) != 0) {
throw_ioe(env, errno);
goto cleanup;
}

cleanup:
return;
}

JNIEXPORT jstring JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_getDiskName
(JNIEnv *env, jclass class, jstring j_str) {
struct stat buf;
const char *c_str = NULL;
char diskName[MAX_NAME_LEN] = {0};
jboolean isCopy;
c_str = (*env)->GetStringUTFChars(env, j_str, &isCopy);
// judge whether this str parameter is null or not should in java code.
if(c_str == NULL) {
return NULL;
}
if (unlikely(stat(c_str, &buf) == -1)) {
throw_ioe(env, errno);
goto cleanup;
} else {
unsigned int major = MAJOR(buf.st_dev);
unsigned int minor = MINOR(buf.st_dev);
read_diskstats_stat_work(env, "/proc/diskstats", major, minor, diskName);
(*env)->ReleaseStringUTFChars(env, j_str, c_str);
return (*env)->NewStringUTF(env, diskName);
}

cleanup:
if (c_str != NULL) {
(*env)->ReleaseStringUTFChars(env, j_str, c_str);
}
return NULL;
}
#endif

#ifdef WINDOWS
Expand Down Expand Up @@ -92,12 +175,6 @@ static jclass pmem_region_clazz = NULL;
static jmethodID pmem_region_ctor = NULL;
#endif

/*
* Throw a java.IO.IOException, generating the message from errno.
* NB. this is also used form windows_secure_container_executor.c
*/
extern void throw_ioe(JNIEnv* env, int errnum);

// Internal functions
#ifdef UNIX
static ssize_t get_pw_buflen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2007,5 +2007,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_LEASE_HARDLIMIT_DEFAULT =
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT;

public static final String DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_KEY =
"dfs.datanode.disk.stat.interval.seconds";
public static final long DFS_DATANODE_DISK_STAT_INTERVAL_SECONDS_DEFAULT = 1L;

public static final String
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_KEY =
"dfs.datanode.available-space-volume-choosing-policy.io.util.preference.enable";
public static final boolean
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_IO_UTIL_PREFERENCE_ENABLE_DEFAULT = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ public static InetSocketAddress createSocketAddr(String target) {
private static final double CONGESTION_RATIO = 1.5;
private DiskBalancer diskBalancer;
private DataSetLockManager dataSetLockManager;
private DiskIOUtilManager diskIOUtilManager;

private final ExecutorService xferService;

Expand Down Expand Up @@ -490,6 +491,11 @@ private static Tracer createTracer(Configuration conf) {
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
this.xferService =
HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());
if (Shell.LINUX) {
this.diskIOUtilManager = new DiskIOUtilManager(conf);
} else {
LOG.info("Disk io util manager does not start, only Linux OS release support!");
}
}

/**
Expand Down Expand Up @@ -1190,6 +1196,9 @@ public IOException call() {
conf.set(DFS_DATANODE_DATA_DIR_KEY,
Joiner.on(",").join(effectiveVolumes));
dataDirs = getStorageLocations(conf);
if (diskIOUtilManager != null) {
diskIOUtilManager.setStorageLocations(dataDirs);
}
}
}
}
Expand Down Expand Up @@ -1709,6 +1718,10 @@ void startDataNode(List<StorageLocation> dataDirectories,
synchronized (this) {
this.dataDirs = dataDirectories;
}
if (diskIOUtilManager != null) {
this.diskIOUtilManager.setStorageLocations(dataDirectories);
this.diskIOUtilManager.start();
}
this.dnConf = new DNConf(this);
checkSecureConfig(dnConf, getConf(), resources);

Expand Down Expand Up @@ -2505,6 +2518,10 @@ public void shutdown() {
dataNodeInfoBeanName = null;
}
if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
if (diskIOUtilManager != null) {
diskIOUtilManager.stop();
diskIOUtilManager = null;
}
LOG.info("Shutdown complete.");
synchronized(this) {
// it is already false, but setting it again to avoid a findbug warning.
Expand Down Expand Up @@ -4158,4 +4175,9 @@ boolean isSlownode() {
public BlockPoolManager getBlockPoolManager() {
return blockPoolManager;
}

public int getStorageLocationDiskUtil(StorageLocation location) {
return diskIOUtilManager != null ?
diskIOUtilManager.getStorageLocationDiskIOUtil(location) : 0;
}
}
Loading

0 comments on commit 41cfc18

Please sign in to comment.