Skip to content

HDFS-17728. TestDFSIO can use another name service except the default… #7399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,10 @@ public WriteMapper() {
public Closeable getIOStream(String name) throws IOException {
// create file
Path filePath = new Path(getDataDir(getConf()), name);
OutputStream out = fs.create(filePath, true, bufferSize);
FileSystem ioFS = filePath.getFileSystem(getConf());
OutputStream out = ioFS.create(filePath, true, bufferSize);
if (blockStoragePolicy != null) {
fs.setStoragePolicy(filePath, blockStoragePolicy);
ioFS.setStoragePolicy(filePath, blockStoragePolicy);
}
if(compressionCodec != null)
out = compressionCodec.createOutputStream(out);
Expand Down Expand Up @@ -562,9 +563,10 @@ public AppendMapper() {

@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
Path path = new Path(getDataDir(getConf()), name);
FileSystem ioFS = path.getFileSystem(getConf());
// open file for append
OutputStream out =
fs.append(new Path(getDataDir(getConf()), name), bufferSize);
OutputStream out = ioFS.append(path, bufferSize);
if(compressionCodec != null)
out = compressionCodec.createOutputStream(out);
LOG.info("out = " + out.getClass().getName());
Expand Down Expand Up @@ -609,8 +611,10 @@ public ReadMapper() {

@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
Path filePath = new Path(getDataDir(getConf()), name);
FileSystem ioFS = filePath.getFileSystem(getConf());
// open file
InputStream in = fs.open(new Path(getDataDir(getConf()), name));
InputStream in = ioFS.open(filePath);
if(compressionCodec != null)
in = compressionCodec.createInputStream(in);
LOG.info("in = " + in.getClass().getName());
Expand Down Expand Up @@ -674,8 +678,9 @@ public RandomReadMapper() {
@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
Path filePath = new Path(getDataDir(getConf()), name);
this.fileSize = fs.getFileStatus(filePath).getLen();
InputStream in = fs.open(filePath);
FileSystem ioFS = filePath.getFileSystem(getConf());
this.fileSize = ioFS.getFileStatus(filePath).getLen();
InputStream in = ioFS.open(filePath);
if(compressionCodec != null)
in = new FSDataInputStream(compressionCodec.createInputStream(in));
LOG.info("in = " + in.getClass().getName());
Expand Down Expand Up @@ -742,7 +747,8 @@ public static class TruncateMapper extends IOStatMapper {
@Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException {
filePath = new Path(getDataDir(getConf()), name);
fileSize = fs.getFileStatus(filePath).getLen();
FileSystem ioFS = filePath.getFileSystem(getConf());
fileSize = ioFS.getFileStatus(filePath).getLen();
return null;
}

Expand All @@ -751,14 +757,15 @@ public Long doIO(Reporter reporter,
String name,
long newLength // in bytes
) throws IOException {
boolean isClosed = fs.truncate(filePath, newLength);
FileSystem ioFS = filePath.getFileSystem(getConf());
boolean isClosed = ioFS.truncate(filePath, newLength);
reporter.setStatus("truncating " + name + " to newLength " +
newLength + " ::host = " + hostName);
for(int i = 0; !isClosed; i++) {
try {
Thread.sleep(DELAY);
} catch (InterruptedException ignored) {}
FileStatus status = fs.getFileStatus(filePath);
FileStatus status = ioFS.getFileStatus(filePath);
assert status != null : "status is null";
isClosed = (status.getLen() == newLength);
reporter.setStatus("truncate recover for " + name + " to newLength " +
Expand Down Expand Up @@ -838,6 +845,7 @@ public int run(String[] args) throws IOException {
String storagePolicy = null;
boolean isSequential = false;
String version = TestDFSIO.class.getSimpleName() + ".1.8";
String baseDir = getBaseDir(config);

LOG.info(version);
if (args.length == 0) {
Expand Down Expand Up @@ -904,7 +912,7 @@ public int run(String[] args) throws IOException {
if (skipSize > 0) {
LOG.info("skipSize = " + skipSize);
}
LOG.info("baseDir = " + getBaseDir(config));
LOG.info("baseDir = " + baseDir);

if (compressionClass != null) {
config.set("test.io.compression.class", compressionClass);
Expand All @@ -913,7 +921,7 @@ public int run(String[] args) throws IOException {

config.setInt("test.io.file.buffer.size", bufferSize);
config.setLong("test.io.skip.size", skipSize);
FileSystem fs = FileSystem.get(config);
FileSystem fs = new Path(baseDir).getFileSystem(config);

if (erasureCodePolicyName != null) {
if (!checkErasureCodePolicy(erasureCodePolicyName, fs, testType)) {
Expand Down