-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19189. ITestS3ACommitterFactory failing #6857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
steveloughran
merged 4 commits into
apache:trunk
from
steveloughran:s3/HADOOP-19189-ITestS3ACommitterFactory
Jun 7, 2024
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,36 +19,49 @@ | |
package org.apache.hadoop.fs.s3a.commit; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
|
||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.junit.runners.Parameterized; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.fs.PathIOException; | ||
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; | ||
import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; | ||
import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; | ||
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; | ||
import org.apache.hadoop.mapred.JobConf; | ||
import org.apache.hadoop.mapreduce.MRJobConfig; | ||
import org.apache.hadoop.mapreduce.TaskAttemptContext; | ||
import org.apache.hadoop.mapreduce.TaskAttemptID; | ||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; | ||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | ||
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; | ||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; | ||
import org.apache.hadoop.test.LambdaTestUtils; | ||
import org.apache.hadoop.security.UserGroupInformation; | ||
|
||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; | ||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; | ||
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING; | ||
import static org.apache.hadoop.test.LambdaTestUtils.intercept; | ||
|
||
/** | ||
* Tests for some aspects of the committer factory. | ||
* All tests are grouped into one single test so that only one | ||
* S3A FS client is set up and used for the entire run. | ||
* Saves time and money. | ||
* Tests for the committer factory creation/override process. | ||
*/ | ||
public class ITestS3ACommitterFactory extends AbstractCommitITest { | ||
|
||
|
||
protected static final String INVALID_NAME = "invalid-name"; | ||
@RunWith(Parameterized.class) | ||
public final class ITestS3ACommitterFactory extends AbstractCommitITest { | ||
private static final Logger LOG = LoggerFactory.getLogger( | ||
ITestS3ACommitterFactory.class); | ||
/** | ||
* Name for invalid committer: {@value}. | ||
*/ | ||
private static final String INVALID_NAME = "invalid-name"; | ||
|
||
/** | ||
* Counter to guarantee that even in parallel test runs, no job has the same | ||
|
@@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends AbstractCommitITest { | |
* Parameterized list of bindings of committer name in config file to | ||
* expected class instantiated. | ||
*/ | ||
private static final Object[][] bindings = { | ||
{COMMITTER_NAME_FILE, FileOutputCommitter.class}, | ||
{COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class}, | ||
{COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class}, | ||
{InternalCommitterConstants.COMMITTER_NAME_STAGING, | ||
StagingCommitter.class}, | ||
{COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class} | ||
private static final Object[][] BINDINGS = { | ||
{"", "", FileOutputCommitter.class, "Default Binding"}, | ||
{COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in FS"}, | ||
{COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class, | ||
"partitoned committer in FS"}, | ||
{COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer in FS"}, | ||
{COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer in FS"}, | ||
{COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir committer in FS"}, | ||
{INVALID_NAME, "", null, "invalid committer in FS"}, | ||
|
||
{"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in task"}, | ||
{"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class, | ||
"partioned committer in task"}, | ||
{"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer in task"}, | ||
{"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer in task"}, | ||
{"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir committer in task"}, | ||
{"", INVALID_NAME, null, "invalid committer in task"}, | ||
}; | ||
|
||
/** | ||
* This is a ref to the FS conf, so changes here are visible | ||
* to callers querying the FS config. | ||
* Test array for parameterized test runs. | ||
* | ||
* @return the committer binding for this run. | ||
*/ | ||
private Configuration filesystemConfRef; | ||
|
||
private Configuration taskConfRef; | ||
@Parameterized.Parameters(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]") | ||
public static Collection<Object[]> params() { | ||
return Arrays.asList(BINDINGS); | ||
} | ||
|
||
@Override | ||
public void setup() throws Exception { | ||
super.setup(); | ||
jobId = randomJobId(); | ||
attempt0 = "attempt_" + jobId + "_m_000000_0"; | ||
taskAttempt0 = TaskAttemptID.forName(attempt0); | ||
/** | ||
* Name of committer to set in filesystem config. If "" do not set one. | ||
*/ | ||
private final String fsCommitterName; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: some java doc, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
outDir = path(getMethodName()); | ||
factory = new S3ACommitterFactory(); | ||
Configuration conf = new Configuration(); | ||
conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString()); | ||
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0); | ||
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); | ||
filesystemConfRef = getFileSystem().getConf(); | ||
tContext = new TaskAttemptContextImpl(conf, taskAttempt0); | ||
taskConfRef = tContext.getConfiguration(); | ||
} | ||
/** | ||
* Name of committer to set in job config. | ||
*/ | ||
private final String jobCommitterName; | ||
|
||
@Test | ||
public void testEverything() throws Throwable { | ||
testImplicitFileBinding(); | ||
testBindingsInTask(); | ||
testBindingsInFSConfig(); | ||
testInvalidFileBinding(); | ||
testInvalidTaskBinding(); | ||
} | ||
/** | ||
* Expected committer class. | ||
* If null: an exception is expected | ||
*/ | ||
private final Class<? extends AbstractS3ACommitter> committerClass; | ||
|
||
/** | ||
* Verify that if all config options are unset, the FileOutputCommitter | ||
* | ||
* is returned. | ||
* Description from parameters, simply for thread names to be more informative. | ||
*/ | ||
public void testImplicitFileBinding() throws Throwable { | ||
taskConfRef.unset(FS_S3A_COMMITTER_NAME); | ||
filesystemConfRef.unset(FS_S3A_COMMITTER_NAME); | ||
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class); | ||
} | ||
private final String description; | ||
|
||
/** | ||
* Verify that task bindings are picked up. | ||
* Create a parameterized instance. | ||
* @param fsCommitterName committer to set in filesystem config | ||
* @param jobCommitterName committer to set in job config | ||
* @param committerClass expected committer class | ||
* @param description debug text for thread names. | ||
*/ | ||
public void testBindingsInTask() throws Throwable { | ||
// set this to an invalid value to be confident it is not | ||
// being checked. | ||
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID"); | ||
taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); | ||
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class); | ||
for (Object[] binding : bindings) { | ||
taskConfRef.set(FS_S3A_COMMITTER_NAME, | ||
(String) binding[0]); | ||
assertFactoryCreatesExpectedCommitter((Class) binding[1]); | ||
} | ||
public ITestS3ACommitterFactory( | ||
final String fsCommitterName, | ||
final String jobCommitterName, | ||
final Class<? extends AbstractS3ACommitter> committerClass, | ||
final String description) { | ||
this.fsCommitterName = fsCommitterName; | ||
this.jobCommitterName = jobCommitterName; | ||
this.committerClass = committerClass; | ||
this.description = description; | ||
} | ||
|
||
@Override | ||
protected Configuration createConfiguration() { | ||
final Configuration conf = super.createConfiguration(); | ||
// do not cache, because we want the committer one to pick up | ||
// the fs with fs-specific configuration | ||
conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false); | ||
removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME); | ||
maybeSetCommitterName(conf, fsCommitterName); | ||
return conf; | ||
} | ||
|
||
/** | ||
* Verify that FS bindings are picked up. | ||
* Set a committer name in a configuration. | ||
* @param conf configuration to patch. | ||
* @param name name. If "" the option is unset. | ||
*/ | ||
public void testBindingsInFSConfig() throws Throwable { | ||
taskConfRef.unset(FS_S3A_COMMITTER_NAME); | ||
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE); | ||
assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class); | ||
for (Object[] binding : bindings) { | ||
taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]); | ||
assertFactoryCreatesExpectedCommitter((Class) binding[1]); | ||
private static void maybeSetCommitterName(final Configuration conf, final String name) { | ||
if (!name.isEmpty()) { | ||
conf.set(FS_S3A_COMMITTER_NAME, name); | ||
} else { | ||
conf.unset(FS_S3A_COMMITTER_NAME); | ||
} | ||
} | ||
|
||
/** | ||
* Create an invalid committer via the FS binding. | ||
*/ | ||
public void testInvalidFileBinding() throws Throwable { | ||
taskConfRef.unset(FS_S3A_COMMITTER_NAME); | ||
filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME); | ||
LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME, | ||
() -> createCommitter()); | ||
@Override | ||
public void setup() throws Exception { | ||
// destroy all filesystems from previous runs. | ||
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); | ||
super.setup(); | ||
jobId = randomJobId(); | ||
attempt0 = "attempt_" + jobId + "_m_000000_0"; | ||
taskAttempt0 = TaskAttemptID.forName(attempt0); | ||
|
||
outDir = methodPath(); | ||
factory = new S3ACommitterFactory(); | ||
final Configuration fsConf = getConfiguration(); | ||
JobConf jobConf = new JobConf(fsConf); | ||
jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString()); | ||
jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0); | ||
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); | ||
maybeSetCommitterName(jobConf, jobCommitterName); | ||
tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0); | ||
|
||
LOG.info("{}: Filesystem Committer='{}'; task='{}'", | ||
description, | ||
fsConf.get(FS_S3A_COMMITTER_NAME), | ||
jobConf.get(FS_S3A_COMMITTER_NAME)); | ||
} | ||
|
||
|
||
@Override | ||
protected void deleteTestDirInTeardown() { | ||
// no-op | ||
} | ||
|
||
/** | ||
* Create an invalid committer via the task attempt. | ||
* Verify that if all config options are unset, the FileOutputCommitter | ||
* is returned. | ||
*/ | ||
public void testInvalidTaskBinding() throws Throwable { | ||
filesystemConfRef.unset(FS_S3A_COMMITTER_NAME); | ||
taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME); | ||
LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME, | ||
() -> createCommitter()); | ||
@Test | ||
public void testBinding() throws Throwable { | ||
assertFactoryCreatesExpectedCommitter(committerClass); | ||
} | ||
|
||
/** | ||
* Assert that the factory creates the expected committer. | ||
* If a null committer is passed in, a {@link PathIOException} | ||
* is expected. | ||
* @param expected expected committer class. | ||
* @throws IOException IO failure. | ||
* @throws Exception IO failure. | ||
*/ | ||
protected void assertFactoryCreatesExpectedCommitter( | ||
private void assertFactoryCreatesExpectedCommitter( | ||
final Class expected) | ||
throws IOException { | ||
assertEquals("Wrong Committer from factory", | ||
expected, | ||
createCommitter().getClass()); | ||
throws Exception { | ||
describe("Creating committer: expected class \"%s\"", expected); | ||
if (expected != null) { | ||
assertEquals("Wrong Committer from factory", | ||
expected, | ||
createCommitter().getClass()); | ||
} else { | ||
intercept(PathCommitException.class, this::createCommitter); | ||
} | ||
} | ||
|
||
/** | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.