19
19
package org .apache .hadoop .fs .s3a .commit ;
20
20
21
21
import java .io .IOException ;
22
+ import java .util .Arrays ;
23
+ import java .util .Collection ;
22
24
23
25
import org .junit .Test ;
26
+ import org .junit .runner .RunWith ;
27
+ import org .junit .runners .Parameterized ;
28
+ import org .slf4j .Logger ;
29
+ import org .slf4j .LoggerFactory ;
24
30
25
31
import org .apache .hadoop .conf .Configuration ;
32
+ import org .apache .hadoop .fs .FileSystem ;
26
33
import org .apache .hadoop .fs .Path ;
34
+ import org .apache .hadoop .fs .PathIOException ;
27
35
import org .apache .hadoop .fs .s3a .commit .magic .MagicS3GuardCommitter ;
28
36
import org .apache .hadoop .fs .s3a .commit .staging .DirectoryStagingCommitter ;
29
37
import org .apache .hadoop .fs .s3a .commit .staging .PartitionedStagingCommitter ;
30
38
import org .apache .hadoop .fs .s3a .commit .staging .StagingCommitter ;
39
+ import org .apache .hadoop .mapred .JobConf ;
31
40
import org .apache .hadoop .mapreduce .MRJobConfig ;
32
41
import org .apache .hadoop .mapreduce .TaskAttemptContext ;
33
42
import org .apache .hadoop .mapreduce .TaskAttemptID ;
34
43
import org .apache .hadoop .mapreduce .lib .output .FileOutputCommitter ;
35
44
import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
36
45
import org .apache .hadoop .mapreduce .lib .output .PathOutputCommitter ;
37
46
import org .apache .hadoop .mapreduce .task .TaskAttemptContextImpl ;
38
- import org .apache .hadoop .test . LambdaTestUtils ;
47
+ import org .apache .hadoop .security . UserGroupInformation ;
39
48
49
+ import static org .apache .hadoop .fs .s3a .S3ATestUtils .removeBaseAndBucketOverrides ;
40
50
import static org .apache .hadoop .fs .s3a .commit .CommitConstants .*;
51
+ import static org .apache .hadoop .fs .s3a .commit .InternalCommitterConstants .COMMITTER_NAME_STAGING ;
52
+ import static org .apache .hadoop .test .LambdaTestUtils .intercept ;
41
53
42
54
/**
43
- * Tests for some aspects of the committer factory.
44
- * All tests are grouped into one single test so that only one
45
- * S3A FS client is set up and used for the entire run.
46
- * Saves time and money.
55
+ * Tests for the committer factory creation/override process.
47
56
*/
48
- public class ITestS3ACommitterFactory extends AbstractCommitITest {
49
-
50
-
51
- protected static final String INVALID_NAME = "invalid-name" ;
57
+ @ RunWith (Parameterized .class )
58
+ public final class ITestS3ACommitterFactory extends AbstractCommitITest {
59
+ private static final Logger LOG = LoggerFactory .getLogger (
60
+ ITestS3ACommitterFactory .class );
61
+ /**
62
+ * Name for invalid committer: {@value}.
63
+ */
64
+ private static final String INVALID_NAME = "invalid-name" ;
52
65
53
66
/**
54
67
* Counter to guarantee that even in parallel test runs, no job has the same
@@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends AbstractCommitITest {
72
85
* Parameterized list of bindings of committer name in config file to
73
86
* expected class instantiated.
74
87
*/
75
- private static final Object [][] bindings = {
76
- {COMMITTER_NAME_FILE , FileOutputCommitter .class },
77
- {COMMITTER_NAME_DIRECTORY , DirectoryStagingCommitter .class },
78
- {COMMITTER_NAME_PARTITIONED , PartitionedStagingCommitter .class },
79
- {InternalCommitterConstants .COMMITTER_NAME_STAGING ,
80
- StagingCommitter .class },
81
- {COMMITTER_NAME_MAGIC , MagicS3GuardCommitter .class }
88
+ private static final Object [][] BINDINGS = {
89
+ {"" , "" , FileOutputCommitter .class , "Default Binding" },
90
+ {COMMITTER_NAME_FILE , "" , FileOutputCommitter .class , "File committer in FS" },
91
+ {COMMITTER_NAME_PARTITIONED , "" , PartitionedStagingCommitter .class ,
92
+ "partitoned committer in FS" },
93
+ {COMMITTER_NAME_STAGING , "" , StagingCommitter .class , "staging committer in FS" },
94
+ {COMMITTER_NAME_MAGIC , "" , MagicS3GuardCommitter .class , "magic committer in FS" },
95
+ {COMMITTER_NAME_DIRECTORY , "" , DirectoryStagingCommitter .class , "Dir committer in FS" },
96
+ {INVALID_NAME , "" , null , "invalid committer in FS" },
97
+
98
+ {"" , COMMITTER_NAME_FILE , FileOutputCommitter .class , "File committer in task" },
99
+ {"" , COMMITTER_NAME_PARTITIONED , PartitionedStagingCommitter .class ,
100
+ "partioned committer in task" },
101
+ {"" , COMMITTER_NAME_STAGING , StagingCommitter .class , "staging committer in task" },
102
+ {"" , COMMITTER_NAME_MAGIC , MagicS3GuardCommitter .class , "magic committer in task" },
103
+ {"" , COMMITTER_NAME_DIRECTORY , DirectoryStagingCommitter .class , "Dir committer in task" },
104
+ {"" , INVALID_NAME , null , "invalid committer in task" },
82
105
};
83
106
84
107
/**
85
- * This is a ref to the FS conf, so changes here are visible
86
- * to callers querying the FS config.
108
+ * Test array for parameterized test runs.
109
+ *
110
+ * @return the committer binding for this run.
87
111
*/
88
- private Configuration filesystemConfRef ;
89
-
90
- private Configuration taskConfRef ;
112
+ @ Parameterized .Parameters (name = "{3}-fs=[{0}]-task=[{1}]-[{2}]" )
113
+ public static Collection <Object []> params () {
114
+ return Arrays .asList (BINDINGS );
115
+ }
91
116
92
- @ Override
93
- public void setup () throws Exception {
94
- super .setup ();
95
- jobId = randomJobId ();
96
- attempt0 = "attempt_" + jobId + "_m_000000_0" ;
97
- taskAttempt0 = TaskAttemptID .forName (attempt0 );
117
+ /**
118
+ * Name of committer to set in filesystem config. If "" do not set one.
119
+ */
120
+ private final String fsCommitterName ;
98
121
99
- outDir = path (getMethodName ());
100
- factory = new S3ACommitterFactory ();
101
- Configuration conf = new Configuration ();
102
- conf .set (FileOutputFormat .OUTDIR , outDir .toUri ().toString ());
103
- conf .set (MRJobConfig .TASK_ATTEMPT_ID , attempt0 );
104
- conf .setInt (MRJobConfig .APPLICATION_ATTEMPT_ID , 1 );
105
- filesystemConfRef = getFileSystem ().getConf ();
106
- tContext = new TaskAttemptContextImpl (conf , taskAttempt0 );
107
- taskConfRef = tContext .getConfiguration ();
108
- }
122
+ /**
123
+ * Name of committer to set in job config.
124
+ */
125
+ private final String jobCommitterName ;
109
126
110
- @ Test
111
- public void testEverything () throws Throwable {
112
- testImplicitFileBinding ();
113
- testBindingsInTask ();
114
- testBindingsInFSConfig ();
115
- testInvalidFileBinding ();
116
- testInvalidTaskBinding ();
117
- }
127
+ /**
128
+ * Expected committer class.
129
+ * If null: an exception is expected
130
+ */
131
+ private final Class <? extends AbstractS3ACommitter > committerClass ;
118
132
119
133
/**
120
- * Verify that if all config options are unset, the FileOutputCommitter
121
- *
122
- * is returned.
134
+ * Description from parameters, simply for thread names to be more informative.
123
135
*/
124
- public void testImplicitFileBinding () throws Throwable {
125
- taskConfRef .unset (FS_S3A_COMMITTER_NAME );
126
- filesystemConfRef .unset (FS_S3A_COMMITTER_NAME );
127
- assertFactoryCreatesExpectedCommitter (FileOutputCommitter .class );
128
- }
136
+ private final String description ;
129
137
130
138
/**
131
- * Verify that task bindings are picked up.
139
+ * Create a parameterized instance.
140
+ * @param fsCommitterName committer to set in filesystem config
141
+ * @param jobCommitterName committer to set in job config
142
+ * @param committerClass expected committer class
143
+ * @param description debug text for thread names.
132
144
*/
133
- public void testBindingsInTask () throws Throwable {
134
- // set this to an invalid value to be confident it is not
135
- // being checked.
136
- filesystemConfRef .set (FS_S3A_COMMITTER_NAME , "INVALID" );
137
- taskConfRef .set (FS_S3A_COMMITTER_NAME , COMMITTER_NAME_FILE );
138
- assertFactoryCreatesExpectedCommitter (FileOutputCommitter .class );
139
- for (Object [] binding : bindings ) {
140
- taskConfRef .set (FS_S3A_COMMITTER_NAME ,
141
- (String ) binding [0 ]);
142
- assertFactoryCreatesExpectedCommitter ((Class ) binding [1 ]);
143
- }
145
+ public ITestS3ACommitterFactory (
146
+ final String fsCommitterName ,
147
+ final String jobCommitterName ,
148
+ final Class <? extends AbstractS3ACommitter > committerClass ,
149
+ final String description ) {
150
+ this .fsCommitterName = fsCommitterName ;
151
+ this .jobCommitterName = jobCommitterName ;
152
+ this .committerClass = committerClass ;
153
+ this .description = description ;
154
+ }
155
+
156
+ @ Override
157
+ protected Configuration createConfiguration () {
158
+ final Configuration conf = super .createConfiguration ();
159
+ // do not cache, because we want the committer one to pick up
160
+ // the fs with fs-specific configuration
161
+ conf .setBoolean (FS_S3A_IMPL_DISABLE_CACHE , false );
162
+ removeBaseAndBucketOverrides (conf , FS_S3A_COMMITTER_NAME );
163
+ maybeSetCommitterName (conf , fsCommitterName );
164
+ return conf ;
144
165
}
145
166
146
167
/**
147
- * Verify that FS bindings are picked up.
168
+ * Set a committer name in a configuration.
169
+ * @param conf configuration to patch.
170
+ * @param name name. If "" the option is unset.
148
171
*/
149
- public void testBindingsInFSConfig () throws Throwable {
150
- taskConfRef .unset (FS_S3A_COMMITTER_NAME );
151
- filesystemConfRef .set (FS_S3A_COMMITTER_NAME , COMMITTER_NAME_FILE );
152
- assertFactoryCreatesExpectedCommitter (FileOutputCommitter .class );
153
- for (Object [] binding : bindings ) {
154
- taskConfRef .set (FS_S3A_COMMITTER_NAME , (String ) binding [0 ]);
155
- assertFactoryCreatesExpectedCommitter ((Class ) binding [1 ]);
172
+ private static void maybeSetCommitterName (final Configuration conf , final String name ) {
173
+ if (!name .isEmpty ()) {
174
+ conf .set (FS_S3A_COMMITTER_NAME , name );
175
+ } else {
176
+ conf .unset (FS_S3A_COMMITTER_NAME );
156
177
}
157
178
}
158
179
159
- /**
160
- * Create an invalid committer via the FS binding.
161
- */
162
- public void testInvalidFileBinding () throws Throwable {
163
- taskConfRef .unset (FS_S3A_COMMITTER_NAME );
164
- filesystemConfRef .set (FS_S3A_COMMITTER_NAME , INVALID_NAME );
165
- LambdaTestUtils .intercept (PathCommitException .class , INVALID_NAME ,
166
- () -> createCommitter ());
180
+ @ Override
181
+ public void setup () throws Exception {
182
+ // destroy all filesystems from previous runs.
183
+ FileSystem .closeAllForUGI (UserGroupInformation .getCurrentUser ());
184
+ super .setup ();
185
+ jobId = randomJobId ();
186
+ attempt0 = "attempt_" + jobId + "_m_000000_0" ;
187
+ taskAttempt0 = TaskAttemptID .forName (attempt0 );
188
+
189
+ outDir = methodPath ();
190
+ factory = new S3ACommitterFactory ();
191
+ final Configuration fsConf = getConfiguration ();
192
+ JobConf jobConf = new JobConf (fsConf );
193
+ jobConf .set (FileOutputFormat .OUTDIR , outDir .toUri ().toString ());
194
+ jobConf .set (MRJobConfig .TASK_ATTEMPT_ID , attempt0 );
195
+ jobConf .setInt (MRJobConfig .APPLICATION_ATTEMPT_ID , 1 );
196
+ maybeSetCommitterName (jobConf , jobCommitterName );
197
+ tContext = new TaskAttemptContextImpl (jobConf , taskAttempt0 );
198
+
199
+ LOG .info ("{}: Filesystem Committer='{}'; task='{}'" ,
200
+ description ,
201
+ fsConf .get (FS_S3A_COMMITTER_NAME ),
202
+ jobConf .get (FS_S3A_COMMITTER_NAME ));
203
+ }
204
+
205
+
206
+ @ Override
207
+ protected void deleteTestDirInTeardown () {
208
+ // no-op
167
209
}
168
210
169
211
/**
170
- * Create an invalid committer via the task attempt.
212
+ * Verify that if all config options are unset, the FileOutputCommitter
213
+ * is returned.
171
214
*/
172
- public void testInvalidTaskBinding () throws Throwable {
173
- filesystemConfRef .unset (FS_S3A_COMMITTER_NAME );
174
- taskConfRef .set (FS_S3A_COMMITTER_NAME , INVALID_NAME );
175
- LambdaTestUtils .intercept (PathCommitException .class , INVALID_NAME ,
176
- () -> createCommitter ());
215
+ @ Test
216
+ public void testBinding () throws Throwable {
217
+ assertFactoryCreatesExpectedCommitter (committerClass );
177
218
}
178
219
179
220
/**
180
221
* Assert that the factory creates the expected committer.
222
+ * If a null committer is passed in, a {@link PathIOException}
223
+ * is expected.
181
224
* @param expected expected committer class.
182
- * @throws IOException IO failure.
225
+ * @throws Exception IO failure.
183
226
*/
184
- protected void assertFactoryCreatesExpectedCommitter (
227
+ private void assertFactoryCreatesExpectedCommitter (
185
228
final Class expected )
186
- throws IOException {
187
- assertEquals ("Wrong Committer from factory" ,
188
- expected ,
189
- createCommitter ().getClass ());
229
+ throws Exception {
230
+ describe ("Creating committer: expected class \" %s\" " , expected );
231
+ if (expected != null ) {
232
+ assertEquals ("Wrong Committer from factory" ,
233
+ expected ,
234
+ createCommitter ().getClass ());
235
+ } else {
236
+ intercept (PathCommitException .class , this ::createCommitter );
237
+ }
190
238
}
191
239
192
240
/**
0 commit comments