Skip to content

Commit 1168abc

Browse files
committed
MAPREDUCE-7403. manifest-committer dynamic partitioning support. (#4728)
Declares its compatibility with Spark's dynamic output partitioning by having the stream capability "mapreduce.job.committer.dynamic.partitioning" Requires a Spark release with SPARK-40034, which does the probing before deciding whether to accept/rejecting instantiation with dynamic partition overwrite set This feature can be declared as supported by any other PathOutputCommitter implementations whose algorithm and destination filesystem are compatible. None of the S3A committers are compatible. The classic FileOutputCommitter is, but it does not declare itself as such out of our fear of changing that code. The Spark-side code will automatically infer compatibility if the created committer is of that class or a subclass. Contributed by Steve Loughran.
1 parent 98dd2b5 commit 1168abc

File tree

5 files changed

+135
-3
lines changed

5 files changed

+135
-3
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
import org.apache.hadoop.classification.InterfaceAudience;
2424
import org.apache.hadoop.classification.InterfaceStability;
2525
import org.apache.hadoop.fs.Path;
26+
import org.apache.hadoop.fs.StreamCapabilities;
27+
import org.apache.hadoop.fs.statistics.IOStatistics;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
2630
import org.apache.hadoop.mapreduce.JobContext;
2731
import org.apache.hadoop.mapreduce.JobStatus;
2832
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -60,7 +64,8 @@
6064
*/
6165
@InterfaceAudience.Public
6266
@InterfaceStability.Unstable
63-
public class BindingPathOutputCommitter extends PathOutputCommitter {
67+
public class BindingPathOutputCommitter extends PathOutputCommitter
68+
implements IOStatisticsSource, StreamCapabilities {
6469

6570
/**
6671
* The classname for use in configurations.
@@ -181,4 +186,22 @@ public String toString() {
181186
public PathOutputCommitter getCommitter() {
182187
return committer;
183188
}
189+
190+
/**
191+
* Pass through if the inner committer supports StreamCapabilities.
192+
* {@inheritDoc}
193+
*/
194+
@Override
195+
public boolean hasCapability(final String capability) {
196+
if (committer instanceof StreamCapabilities) {
197+
return ((StreamCapabilities) committer).hasCapability(capability);
198+
} else {
199+
return false;
200+
}
201+
}
202+
203+
@Override
204+
public IOStatistics getIOStatistics() {
205+
return IOStatisticsSupport.retrieveIOStatistics(committer);
206+
}
184207
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.fs.FileStatus;
3232
import org.apache.hadoop.fs.FileSystem;
3333
import org.apache.hadoop.fs.Path;
34+
import org.apache.hadoop.fs.StreamCapabilities;
3435
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
3536
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
3637
import org.apache.hadoop.mapreduce.JobContext;
@@ -55,6 +56,7 @@
5556

5657
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
5758
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
59+
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING;
5860
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
5961
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
6062
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT;
@@ -84,7 +86,7 @@
8486
@InterfaceAudience.Public
8587
@InterfaceStability.Stable
8688
public class ManifestCommitter extends PathOutputCommitter implements
87-
IOStatisticsSource, StageEventCallbacks {
89+
IOStatisticsSource, StageEventCallbacks, StreamCapabilities {
8890

8991
public static final Logger LOG = LoggerFactory.getLogger(
9092
ManifestCommitter.class);
@@ -758,4 +760,15 @@ private static Path maybeSaveSummary(
758760
public IOStatisticsStore getIOStatistics() {
759761
return iostatistics;
760762
}
763+
764+
/**
765+
* The committer is compatible with spark's dynamic partitioning
766+
* algorithm.
767+
* @param capability string to query the stream support for.
768+
* @return true if the requested capability is supported.
769+
*/
770+
@Override
771+
public boolean hasCapability(final String capability) {
772+
return CAPABILITY_DYNAMIC_PARTITIONING.equals(capability);
773+
}
761774
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,12 @@ public final class ManifestCommitterConstants {
234234
*/
235235
public static final String CONTEXT_ATTR_TASK_ATTEMPT_ID = "ta";
236236

237+
/**
238+
* Stream Capabilities probe for spark dynamic partitioning compatibility.
239+
*/
240+
public static final String CAPABILITY_DYNAMIC_PARTITIONING =
241+
"mapreduce.job.committer.dynamic.partitioning";
242+
237243
private ManifestCommitterConstants() {
238244
}
239245

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,76 @@ appending data are creating and writing into new partitions.
269269
job to create unique files. This is foundational for
270270
any job to generate correct data.
271271

272+
# <a name="dynamic"></a> Spark Dynamic Partition overwriting
273+
274+
Spark has a feature called "Dynamic Partition Overwrites",
275+
276+
This can be initiated in SQL
277+
```SQL
278+
INSERT OVERWRITE TABLE ...
279+
```
280+
Or through DataSet writes where the mode is `overwrite` and the partitioning matches
281+
that of the existing table
282+
```scala
283+
sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
284+
// followed by an overwrite of a Dataset into an existing partitioned table.
285+
eventData2
286+
.write
287+
.mode("overwrite")
288+
.partitionBy("year", "month")
289+
.format("parquet")
290+
.save(existingDir)
291+
```
292+
293+
This feature is implemented in Spark, which
294+
1. Directs the job to write its new data to a temporary directory
295+
1. After job commit completes, scans the output to identify the leaf directories "partitions" into which data was written.
296+
1. Deletes the content of those directories in the destination table
297+
1. Renames the new files into the partitions.
298+
299+
This is all done in spark, which takes over the tasks of scanning
300+
the intermediate output tree, deleting partitions and of
301+
renaming the new files.
302+
303+
This feature also adds the ability for a job to write data entirely outside
304+
the destination table, which is done by
305+
1. writing new files into the working directory
306+
1. spark moving them to the final destination in job commit
307+
308+
309+
The manifest committer is compatible with dynamic partition overwrites
310+
on Azure and Google cloud storage as together they meet the core requirements of
311+
the extension:
312+
1. The working directory returned in `getWorkPath()` is in the same filesystem
313+
as the final output.
314+
2. `rename()` is an `O(1)` operation which is safe and fast to use when committing a job.
315+
316+
None of the S3A committers support this. Condition (1) is not met by
317+
the staging committers, while (2) is not met by S3 itself.
318+
319+
To use the manifest committer with dynamic partition overwrites, the
320+
spark version must contain
321+
[SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034)
322+
_PathOutputCommitters to work with dynamic partition overwrite_.
323+
324+
Be aware that the rename phase of the operation will be slow
325+
if many files are renamed -this is done sequentially.
326+
Parallel renaming would speed this up, *but could trigger the abfs overload
327+
problems the manifest committer is designed to both minimize the risk
328+
of and support recovery from*
329+
330+
The spark side of the commit operation will be listing/treewalking
331+
the temporary output directory (some overhead), followed by
332+
the file promotion, done with a classic filesystem `rename()`
333+
call. There will be no explicit rate limiting here.
334+
335+
*What does this mean?*
336+
337+
It means that _dynamic partitioning should not be used on Azure Storage
338+
for SQL queries/Spark DataSet operations where many thousands of files are created.
339+
The fact that these will suffer from performance problems before
340+
throttling scale issues surface, should be considered a warning.
341+
272342
# <a name="SUCCESS"></a> Job Summaries in `_SUCCESS` files
273343

274344
The original hadoop committer creates a zero byte `_SUCCESS` file in the root of the output directory
@@ -585,7 +655,7 @@ There is no need to alter these values, except when writing new implementations
585655
something which is only needed if the store provides extra integration support for the
586656
committer.
587657

588-
## <a name="concurrent"></a> Support for concurrent test runs.
658+
## <a name="concurrent"></a> Support for concurrent jobs to the same directory
589659

590660
It *may* be possible to run multiple jobs targeting the same directory tree.
591661

@@ -600,6 +670,8 @@ For this to work, a number of conditions must be met:
600670
`mapreduce.fileoutputcommitter.cleanup.skipped` to `true`.
601671
* All jobs/tasks must create files with unique filenames.
602672
* All jobs must create output with the same directory partition structure.
673+
* The job/queries MUST NOT be using Spark Dynamic Partitioning "INSERT OVERWRITE TABLE"; data may be lost.
674+
This holds for *all* committers, not just the manifest committer.
603675
* Remember to delete the `_temporary` directory later!
604676

605677
This has *NOT BEEN TESTED*

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestManifestCommitProtocol.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.hadoop.mapreduce.RecordWriter;
6262
import org.apache.hadoop.mapreduce.TaskAttemptContext;
6363
import org.apache.hadoop.mapreduce.TaskAttemptID;
64+
import org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter;
6465
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
6566
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
6667
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
@@ -1549,6 +1550,23 @@ public void testOutputFormatIntegration() throws Throwable {
15491550
ManifestCommitter committer = (ManifestCommitter)
15501551
outputFormat.getOutputCommitter(tContext);
15511552

1553+
// check path capabilities directly
1554+
Assertions.assertThat(committer.hasCapability(
1555+
ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING))
1556+
.describedAs("dynamic partitioning capability in committer %s",
1557+
committer)
1558+
.isTrue();
1559+
// and through a binding committer -passthrough is critical
1560+
// for the spark binding.
1561+
BindingPathOutputCommitter bindingCommitter =
1562+
new BindingPathOutputCommitter(outputDir, tContext);
1563+
Assertions.assertThat(bindingCommitter.hasCapability(
1564+
ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING))
1565+
.describedAs("dynamic partitioning capability in committer %s",
1566+
bindingCommitter)
1567+
.isTrue();
1568+
1569+
15521570
// setup
15531571
JobData jobData = new JobData(job, jContext, tContext, committer);
15541572
setupJob(jobData);

0 commit comments

Comments
 (0)