Skip to content

Commit

Permalink
HADOOP-16202. Enhanced openFile(): mapreduce and YARN changes. (#2584/2)
Browse files Browse the repository at this point in the history
These changes ensure that sequential files are opened with the
right read policy, and split start/end is passed in.

As well as offering opportunities for filesystem clients to
choose fetch/cache/seek policies, the settings ensure that
processing text files on an s3 bucket where the default policy
is "random" will still be processed efficiently.

This commit depends on the associated hadoop-common patch,
which must be committed first.

Contributed by Steve Loughran.

Change-Id: Ic6713fd752441cf42ebe8739d05c2293a5db9f94
  • Loading branch information
steveloughran committed Apr 27, 2022
1 parent 75950e4 commit e123de9
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/**
* Reads in history events from the JobHistoryFile and sends them out again
* to be recorded.
Expand Down Expand Up @@ -118,7 +122,11 @@ public static FSDataInputStream getPreviousJobHistoryFileStream(
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
in = awaitFuture(
fc.openFile(historyFile)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.build());
return in;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
Expand All @@ -41,9 +40,13 @@
import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader;
import org.apache.hadoop.util.functional.FutureIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;

/**
* Treats keys as offset in file and value as line.
*/
Expand Down Expand Up @@ -109,10 +112,14 @@ public LineRecordReader(Configuration job, FileSplit split,
// open the file and seek to the start of the split
final FutureDataInputStreamBuilder builder =
file.getFileSystem(job).openFile(file);
FutureIOSupport.propagateOptions(builder, job,
// the start and end of the split may be used to build
// an input strategy.
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
fileIn = FutureIOSupport.awaitFuture(builder.build());
fileIn = FutureIO.awaitFuture(builder.build());
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CodecPool;
Expand All @@ -40,6 +39,8 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.functional.FutureIO;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -94,10 +95,10 @@ public void initialize(Configuration job, long splitStart, long splitLength,
// open the file
final FutureDataInputStreamBuilder builder =
file.getFileSystem(job).openFile(file);
FutureIOSupport.propagateOptions(builder, job,
FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
fileIn = FutureIOSupport.awaitFuture(builder.build());
fileIn = FutureIO.awaitFuture(builder.build());

CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null != codec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
Expand All @@ -40,9 +39,14 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.functional.FutureIO;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;

/**
* Treats keys as offset in file and value as line.
*/
Expand Down Expand Up @@ -86,10 +90,14 @@ public void initialize(InputSplit genericSplit,
// open the file and seek to the start of the split
final FutureDataInputStreamBuilder builder =
file.getFileSystem(job).openFile(file);
FutureIOSupport.propagateOptions(builder, job,
// the start and end of the split may be used to build
// an input strategy.
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start);
builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
fileIn = FutureIOSupport.awaitFuture(builder.build());
fileIn = FutureIO.awaitFuture(builder.build());

CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
Expand All @@ -39,6 +38,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.functional.FutureIO;

/**
* NLineInputFormat which splits N lines of input as one split.
Expand Down Expand Up @@ -99,10 +99,10 @@ public static List<FileSplit> getSplitsForFile(FileStatus status,
try {
final FutureDataInputStreamBuilder builder =
fileName.getFileSystem(conf).openFile(fileName);
FutureIOSupport.propagateOptions(builder, conf,
FutureIO.propagateOptions(builder, conf,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build());
FSDataInputStream in = FutureIO.awaitFuture(builder.build());
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
Expand All @@ -41,6 +42,12 @@
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.functional.FutureIO;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;

/**
* An input format that reads the first 10 characters of each line as the key
Expand Down Expand Up @@ -224,12 +231,17 @@ public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
Path p = ((FileSplit)split).getPath();
FileSystem fs = p.getFileSystem(context.getConfiguration());
in = fs.open(p);
long start = ((FileSplit)split).getStart();
// find the offset to start at a record boundary
offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
in.seek(start + offset);
length = ((FileSplit)split).getLength();
final FutureDataInputStreamBuilder builder = fs.openFile(p)
.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
.opt(FS_OPTION_OPENFILE_SPLIT_END, start + length)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
in = FutureIO.awaitFuture(builder.build());
in.seek(start + offset);
}

public void close() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
import org.apache.hadoop.tools.util.RetriableCommand;
import org.apache.hadoop.tools.util.ThrottledInputStream;

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.VisibleForTesting;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/**
* This class extends RetriableCommand to implement the copy of files,
Expand Down Expand Up @@ -328,7 +332,11 @@ private static ThrottledInputStream getInputStream(Path path,
FileSystem fs = path.getFileSystem(conf);
float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
DistCpConstants.DEFAULT_BANDWIDTH_MB);
FSDataInputStream in = fs.open(path);
// open with sequential read, but not whole-file
FSDataInputStream in = awaitFuture(fs.openFile(path)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
.build());
return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MRJobConfig;
Expand All @@ -35,6 +34,7 @@
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.streaming.StreamUtil;
import org.apache.hadoop.util.functional.FutureIO;

/**
* An input format that selects a RecordReader based on a JobConf property. This
Expand Down Expand Up @@ -66,10 +66,10 @@ public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
FileSystem fs = path.getFileSystem(conf);
// open the file
final FutureDataInputStreamBuilder builder = fs.openFile(path);
FutureIOSupport.propagateOptions(builder, conf,
FutureIO.propagateOptions(builder, conf,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build());
FSDataInputStream in = FutureIO.awaitFuture(builder.build());

// Factory dispatch based on available params..
Class readerClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
Expand All @@ -77,6 +78,11 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

@Public
@Evolving
public class AggregatedLogFormat {
Expand Down Expand Up @@ -576,9 +582,16 @@ public LogReader(Configuration conf, Path remoteAppLogFile)
try {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
FileStatus status = fileContext.getFileStatus(remoteAppLogFile);
this.fsDataIStream = awaitFuture(
fileContext.openFile(remoteAppLogFile)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
.opt(FS_OPTION_OPENFILE_LENGTH,
status.getLen()) // file length hint for object stores
.build());
reader = new TFile.Reader(this.fsDataIStream,
fileContext.getFileStatus(remoteAppLogFile).getLen(), conf);
status.getLen(), conf);
this.scanner = reader.createScanner();
} catch (IOException ioe) {
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.yarn.exceptions.YarnException;

/**
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/**
* Download a single URL to the local disk.
*
*/
Expand Down Expand Up @@ -285,23 +289,25 @@ private void verifyAndCopy(Path destination)
}
}

downloadAndUnpack(sCopy, destination);
downloadAndUnpack(sCopy, sStat, destination);
}

/**
* Copy source path to destination with localization rules.
* @param source source path to copy. Typically HDFS
* @param source source path to copy. Typically HDFS or an object store.
* @param sourceStatus status of source
* @param destination destination path. Typically local filesystem
* @exception YarnException Any error has occurred
*/
private void downloadAndUnpack(Path source, Path destination)
private void downloadAndUnpack(Path source,
FileStatus sourceStatus, Path destination)
throws YarnException {
try {
FileSystem sourceFileSystem = source.getFileSystem(conf);
FileSystem destinationFileSystem = destination.getFileSystem(conf);
if (sourceFileSystem.getFileStatus(source).isDirectory()) {
if (sourceStatus.isDirectory()) {
FileUtil.copy(
sourceFileSystem, source,
sourceFileSystem, sourceStatus,
destinationFileSystem, destination, false,
true, conf);
} else {
Expand Down Expand Up @@ -329,7 +335,11 @@ private void unpack(Path source, Path destination,
FileSystem sourceFileSystem,
FileSystem destinationFileSystem)
throws IOException, InterruptedException, ExecutionException {
try (InputStream inputStream = sourceFileSystem.open(source)) {
try (InputStream inputStream = awaitFuture(
sourceFileSystem.openFile(source)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.build())) {
File dst = new File(destination.toUri());
String lowerDst = StringUtils.toLowerCase(dst.getName());
switch (resource.getType()) {
Expand Down

0 comments on commit e123de9

Please sign in to comment.