Skip to content

Commit 71a3200

Browse files
sadanand48Sadanand Shenoy
authored andcommitted
HDFS-17381. Distcp of EC files should not be limited to DFS. (#6551)
Contributed by Sadanand Shenoy (cherry picked from commit 49a4958)
1 parent 8d60333 commit 71a3200

File tree

9 files changed

+271
-36
lines changed

9 files changed

+271
-36
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2108,4 +2108,23 @@ public static void maybeIgnoreMissingDirectory(FileSystem fs,
21082108
LOG.info("Ignoring missing directory {}", path);
21092109
LOG.debug("Directory missing", e);
21102110
}
2111+
2112+
/**
2113+
* Return true if the FS implements {@link WithErasureCoding} and
2114+
* supports EC_POLICY option in {@link Options.OpenFileOptions}.
2115+
* A message is logged when the filesystem does not support Erasure coding.
2116+
* @param fs filesystem
2117+
* @param path path
2118+
* @return true if the Filesystem supports EC
2119+
* @throws IOException if there is a failure in hasPathCapability call
2120+
*/
2121+
public static boolean checkFSSupportsEC(FileSystem fs, Path path) throws IOException {
2122+
if (fs instanceof WithErasureCoding &&
2123+
fs.hasPathCapability(path, Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY)) {
2124+
return true;
2125+
}
2126+
LOG.warn("Filesystem with scheme {} does not support Erasure Coding" +
2127+
" at path {}", fs.getScheme(), path);
2128+
return false;
2129+
}
21112130
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,5 +704,10 @@ private OpenFileOptions() {
704704
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
705705
.collect(Collectors.toSet()));
706706

707+
/**
708+
* EC policy to be set on the file that needs to be created : {@value}.
709+
*/
710+
public static final String FS_OPTION_OPENFILE_EC_POLICY =
711+
FS_OPTION_OPENFILE + "ec.policy";
707712
}
708713
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs;
20+
21+
import java.io.IOException;
22+
23+
/**
24+
* Filesystems that support EC can implement this interface.
25+
*/
26+
public interface WithErasureCoding {
27+
28+
/**
29+
* Get the EC Policy name of the given file's fileStatus.
30+
* If the file is not erasure coded, this shall return null.
31+
* Callers will make sure to check if fileStatus isInstance of
32+
* an FS that implements this interface.
33+
* If the call fails due to some error, this shall return null.
34+
* @param fileStatus object of the file whose ecPolicy needs to be obtained.
35+
* @return the ec Policy name
36+
*/
37+
String getErasureCodingPolicyName(FileStatus fileStatus);
38+
39+
/**
40+
* Set the given ecPolicy on the path.
41+
* The path and ecPolicyName should be valid (not null/empty, the
42+
* implementing FS shall support the supplied ecPolicy).
43+
* implementations can throw IOException if these conditions are not met.
44+
* @param path on which the EC policy needs to be set.
45+
* @param ecPolicyName the EC policy.
46+
* @throws IOException if there is an error during the set op.
47+
*/
48+
void setErasureCodingPolicy(Path path, String ecPolicyName) throws
49+
IOException;
50+
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.hadoop.fs.permission.AclStatus;
7575
import org.apache.hadoop.fs.permission.FsAction;
7676
import org.apache.hadoop.fs.permission.FsPermission;
77+
import org.apache.hadoop.fs.WithErasureCoding;
7778
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
7879
import org.apache.hadoop.hdfs.client.DfsPathCapabilities;
7980
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -146,7 +147,8 @@
146147
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
147148
@InterfaceStability.Unstable
148149
public class DistributedFileSystem extends FileSystem
149-
implements KeyProviderTokenIssuer, BatchListingOperations, LeaseRecoverable, SafeMode {
150+
implements KeyProviderTokenIssuer, BatchListingOperations, LeaseRecoverable, SafeMode,
151+
WithErasureCoding {
150152
private Path workingDir;
151153
private URI uri;
152154

@@ -376,6 +378,14 @@ public FSDataInputStream open(PathHandle fd, int bufferSize)
376378
return dfs.createWrappedInputStream(dfsis);
377379
}
378380

381+
@Override
382+
public String getErasureCodingPolicyName(FileStatus fileStatus) {
383+
if (!(fileStatus instanceof HdfsFileStatus)) {
384+
return null;
385+
}
386+
return ((HdfsFileStatus) fileStatus).getErasureCodingPolicy().getName();
387+
}
388+
379389
/**
380390
* Create a handle to an HDFS file.
381391
* @param st HdfsFileStatus instance from NameNode
@@ -3862,6 +3872,10 @@ protected EnumSet<CreateFlag> getFlags() {
38623872
*/
38633873
@Override
38643874
public FSDataOutputStream build() throws IOException {
3875+
String ecPolicy = getOptions().get(Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY, "");
3876+
if (!ecPolicy.isEmpty()) {
3877+
ecPolicyName(ecPolicy);
3878+
}
38653879
if (getFlags().contains(CreateFlag.CREATE) ||
38663880
getFlags().contains(CreateFlag.OVERWRITE)) {
38673881
if (isRecursive()) {

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/DfsPathCapabilities.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Optional;
2222

2323
import org.apache.hadoop.fs.CommonPathCapabilities;
24+
import org.apache.hadoop.fs.Options;
2425
import org.apache.hadoop.fs.FileSystem;
2526
import org.apache.hadoop.fs.Path;
2627

@@ -54,6 +55,7 @@ public static Optional<Boolean> hasPathCapability(final Path path,
5455
case CommonPathCapabilities.FS_STORAGEPOLICY:
5556
case CommonPathCapabilities.FS_XATTRS:
5657
case CommonPathCapabilities.FS_TRUNCATE:
58+
case Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY:
5759
return Optional.of(true);
5860
case CommonPathCapabilities.FS_SYMLINKS:
5961
return Optional.of(FileSystem.areSymlinksEnabled());

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
205205
}
206206

207207
if (sourceCurrStatus.isDirectory()) {
208-
createTargetDirsWithRetry(description, target, context, sourceStatus);
208+
createTargetDirsWithRetry(description, target, context, sourceStatus,
209+
sourceFS);
209210
return;
210211
}
211212

@@ -295,10 +296,10 @@ private void copyFileWithRetry(String description,
295296
}
296297

297298
private void createTargetDirsWithRetry(String description, Path target,
298-
Context context, FileStatus sourceStatus) throws IOException {
299+
Context context, FileStatus sourceStatus, FileSystem sourceFS) throws IOException {
299300
try {
300-
new RetriableDirectoryCreateCommand(description).execute(target,
301-
context, sourceStatus);
301+
new RetriableDirectoryCreateCommand(description).execute(target, context,
302+
sourceStatus, sourceFS);
302303
} catch (Exception e) {
303304
throw new IOException("mkdir failed for " + target, e);
304305
}

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableDirectoryCreateCommand.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,20 @@
1818

1919
package org.apache.hadoop.tools.mapred;
2020

21+
import org.apache.hadoop.fs.FileSystem;
2122
import org.apache.hadoop.fs.FileStatus;
22-
import org.apache.hadoop.hdfs.DistributedFileSystem;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.hadoop.fs.WithErasureCoding;
2325
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
24-
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
26+
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
2527
import org.apache.hadoop.tools.DistCpOptions;
2628
import org.apache.hadoop.tools.util.RetriableCommand;
27-
import org.apache.hadoop.fs.Path;
28-
import org.apache.hadoop.fs.FileSystem;
2929
import org.apache.hadoop.mapreduce.Mapper;
3030

31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import static org.apache.hadoop.fs.FileUtil.checkFSSupportsEC;
3135
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;
3236

3337
/**
@@ -36,6 +40,9 @@
3640
*/
3741
public class RetriableDirectoryCreateCommand extends RetriableCommand {
3842

43+
private static final Logger LOG =
44+
LoggerFactory.getLogger(RetriableDirectoryCreateCommand.class);
45+
3946
/**
4047
* Constructor, taking a description of the action.
4148
* @param description Verbose description of the copy operation.
@@ -53,10 +60,11 @@ public RetriableDirectoryCreateCommand(String description) {
5360
*/
5461
@Override
5562
protected Object doExecute(Object... arguments) throws Exception {
56-
assert arguments.length == 3 : "Unexpected argument list.";
63+
assert arguments.length == 4 : "Unexpected argument list.";
5764
Path target = (Path)arguments[0];
5865
Mapper.Context context = (Mapper.Context)arguments[1];
5966
FileStatus sourceStatus = (FileStatus)arguments[2];
67+
FileSystem sourceFs = (FileSystem)arguments[3];
6068

6169
FileSystem targetFS = target.getFileSystem(context.getConfiguration());
6270
if(!targetFS.mkdirs(target)) {
@@ -66,11 +74,16 @@ protected Object doExecute(Object... arguments) throws Exception {
6674
boolean preserveEC = getFileAttributeSettings(context)
6775
.contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY);
6876
if (preserveEC && sourceStatus.isErasureCoded()
69-
&& targetFS instanceof DistributedFileSystem) {
70-
ErasureCodingPolicy ecPolicy =
71-
((HdfsFileStatus) sourceStatus).getErasureCodingPolicy();
72-
DistributedFileSystem dfs = (DistributedFileSystem) targetFS;
73-
dfs.setErasureCodingPolicy(target, ecPolicy.getName());
77+
&& checkFSSupportsEC(sourceFs, sourceStatus.getPath())
78+
&& checkFSSupportsEC(targetFS, target)) {
79+
ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByName(
80+
((WithErasureCoding) sourceFs).getErasureCodingPolicyName(
81+
sourceStatus));
82+
LOG.debug("EC Policy for source path is {}", ecPolicy);
83+
WithErasureCoding ecFs = (WithErasureCoding) targetFS;
84+
if (ecPolicy != null) {
85+
ecFs.setErasureCodingPolicy(target, ecPolicy.getName());
86+
}
7487
}
7588
return true;
7689
}

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
import java.util.EnumSet;
2525

2626
import org.apache.hadoop.fs.FileStatus;
27-
import org.apache.hadoop.hdfs.DistributedFileSystem;
28-
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
29-
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
3027
import org.apache.hadoop.tools.DistCpOptions;
3128
import org.slf4j.Logger;
3229
import org.slf4j.LoggerFactory;
@@ -36,9 +33,11 @@
3633
import org.apache.hadoop.fs.FSDataOutputStream;
3734
import org.apache.hadoop.fs.FileChecksum;
3835
import org.apache.hadoop.fs.FileSystem;
36+
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
3937
import org.apache.hadoop.fs.Options.ChecksumOpt;
4038
import org.apache.hadoop.fs.Path;
4139
import org.apache.hadoop.fs.permission.FsPermission;
40+
import org.apache.hadoop.fs.WithErasureCoding;
4241
import org.apache.hadoop.io.IOUtils;
4342
import org.apache.hadoop.mapreduce.Mapper;
4443
import org.apache.hadoop.tools.CopyListingFileStatus;
@@ -52,8 +51,10 @@
5251

5352
import org.apache.hadoop.classification.VisibleForTesting;
5453

54+
import static org.apache.hadoop.fs.FileUtil.checkFSSupportsEC;
5555
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
5656
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
57+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_EC_POLICY;
5758
import static org.apache.hadoop.tools.mapred.CopyMapper.getFileAttributeSettings;
5859
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
5960

@@ -151,8 +152,8 @@ private long doCopy(CopyListingFileStatus source, Path target,
151152

152153
long offset = (action == FileAction.APPEND) ?
153154
targetFS.getFileStatus(target).getLen() : source.getChunkOffset();
154-
long bytesRead = copyToFile(targetPath, targetFS, source,
155-
offset, context, fileAttributes, sourceChecksum, sourceStatus);
155+
long bytesRead = copyToFile(targetPath, targetFS, source, offset, context,
156+
fileAttributes, sourceChecksum, sourceStatus, sourceFS);
156157

157158
if (!source.isSplit()) {
158159
DistCpUtils.compareFileLengthsAndChecksums(source.getLen(), sourceFS,
@@ -195,7 +196,7 @@ private ChecksumOpt getChecksumOpt(EnumSet<FileAttribute> fileAttributes,
195196
private long copyToFile(Path targetPath, FileSystem targetFS,
196197
CopyListingFileStatus source, long sourceOffset, Mapper.Context context,
197198
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum,
198-
FileStatus sourceStatus)
199+
FileStatus sourceStatus,FileSystem sourceFS)
199200
throws IOException {
200201
FsPermission permission = FsPermission.getFileDefault().applyUMask(
201202
FsPermission.getUMask(targetFS.getConf()));
@@ -205,11 +206,11 @@ private long copyToFile(Path targetPath, FileSystem targetFS,
205206
boolean preserveEC = getFileAttributeSettings(context)
206207
.contains(DistCpOptions.FileAttribute.ERASURECODINGPOLICY);
207208

208-
ErasureCodingPolicy ecPolicy = null;
209+
String ecPolicyName = null;
209210
if (preserveEC && sourceStatus.isErasureCoded()
210-
&& sourceStatus instanceof HdfsFileStatus
211-
&& targetFS instanceof DistributedFileSystem) {
212-
ecPolicy = ((HdfsFileStatus) sourceStatus).getErasureCodingPolicy();
211+
&& checkFSSupportsEC(sourceFS, sourceStatus.getPath())
212+
&& checkFSSupportsEC(targetFS, targetPath)) {
213+
ecPolicyName = ((WithErasureCoding) sourceFS).getErasureCodingPolicyName(sourceStatus);
213214
}
214215
final OutputStream outStream;
215216
if (action == FileAction.OVERWRITE) {
@@ -222,21 +223,21 @@ private long copyToFile(Path targetPath, FileSystem targetFS,
222223
targetFS, targetPath);
223224
FSDataOutputStream out;
224225
ChecksumOpt checksumOpt = getChecksumOpt(fileAttributes, sourceChecksum);
225-
if (!preserveEC || ecPolicy == null) {
226+
if (!preserveEC || ecPolicyName == null) {
226227
out = targetFS.create(targetPath, permission,
227228
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), copyBufferSize,
228229
repl, blockSize, context, checksumOpt);
229230
} else {
230-
DistributedFileSystem dfs = (DistributedFileSystem) targetFS;
231-
DistributedFileSystem.HdfsDataOutputStreamBuilder builder =
232-
dfs.createFile(targetPath).permission(permission).create()
233-
.overwrite(true).bufferSize(copyBufferSize).replication(repl)
234-
.blockSize(blockSize).progress(context).recursive()
235-
.ecPolicyName(ecPolicy.getName());
236-
if (checksumOpt != null) {
237-
builder.checksumOpt(checksumOpt);
238-
}
239-
out = builder.build();
231+
FSDataOutputStreamBuilder builder = targetFS.createFile(targetPath)
232+
.permission(permission)
233+
.overwrite(true)
234+
.bufferSize(copyBufferSize)
235+
.replication(repl)
236+
.blockSize(blockSize)
237+
.progress(context)
238+
.recursive();
239+
builder.opt(FS_OPTION_OPENFILE_EC_POLICY, ecPolicyName);
240+
out = builder.build();
240241
}
241242
outStream = new BufferedOutputStream(out);
242243
} else {

0 commit comments

Comments
 (0)