Skip to content

Commit

Permalink
[SPARK-40034][SQL] PathOutputCommitters to support dynamic partitions
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a
PathOutputCommitter is compatible with dynamic partition overwrite.

This patch has unit tests but not integration tests; really needs
to test the SQL commands through the manifest committer into gcs/abfs,
or at least local fs. That would be possible once hadoop 3.3.5 is out...

Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a
PathOutputCommitter is compatible with dynamic partition overwrite.

### Why are the changes needed?

Hadoop 3.3.5 adds a new committer in mapreduce-core which works fast and correctly on azure and gcs. (it would also work on hdfs, but its optimised for the cloud stores).

The stores and the committer do meet the requirements of Spark SQL Dynamic Partition Overwrite, so it is OK to for spark to work through it.

Spark does not know this; MAPREDUCE-7403 adds a way for any PathOutputCommitter to declare that they are compatible; the IntermediateManifestCommitter will do so.
(apache/hadoop#4728)

### Does this PR introduce _any_ user-facing change?

No.

There is documentation on the feature in the hadoop [manifest committer](https://github.com/apache/hadoop/blob/82372d0d22e696643ad97490bc902fb6d17a6382/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md) docs.

### How was this patch tested?

1. Unit tests in hadoop-cloud which work with hadoop versions with/without the matching change.
2. New integration tests in https://github.com/hortonworks-spark/cloud-integration which require spark to be built against hadoop with the manifest committer declaring compatibility

Those new integration tests include

* spark sql test derived from spark's own [CloudRelationBasicSuite.scala#L212](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/org/apache/spark/sql/sources/CloudRelationBasicSuite.scala#L212)
* Dataset tests extended to verify support for/rejection of dynamic partition overwrite [AbstractCommitDataframeSuite.scala#L151](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/com/cloudera/spark/cloud/committers/AbstractCommitDataframeSuite.scala#L151)

Tested against azure cardiff with the manifest committer; s3 london (s3a committers reject dynamic partition overwrites)

Closes apache#37468 from steveloughran/SPARK-40034-MAPREDUCE-7403-manifest-committer-partitioning.

Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
steveloughran authored and dongjoon-hyun committed Sep 9, 2022
1 parent e83aedd commit 5a599de
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 47 deletions.
112 changes: 107 additions & 5 deletions docs/cloud-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,13 @@ exhibits eventual consistency (example: S3), and often slower than classic
filesystem renames.

Some object store connectors provide custom committers to commit tasks and
jobs without using rename. In versions of Spark built with Hadoop 3.1 or later,
the S3A connector for AWS S3 is such a committer.
jobs without using rename.

### Hadoop S3A committers

In versions of Spark built with Hadoop 3.1 or later,
the hadoop-aws JAR contains committers safe to use for S3 storage
accessed via the s3a connector.

Instead of writing data to a temporary directory on the store for renaming,
these committers write the files to the final destination, but do not issue
Expand All @@ -272,26 +277,123 @@ It has been tested with the most common formats supported by Spark.
mydataframe.write.format("parquet").save("s3a://bucket/destination")
```

More details on these committers can be found in the latest Hadoop documentation.
More details on these committers can be found in
[the latest Hadoop documentation](https://hadoop.apache.org/docs/current/)
with S3A committer detail covered in
[Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html).

Note: depending upon the committer used, in-progress statistics may be
under-reported with Hadoop versions before 3.3.1.

### Amazon EMR: the EMRFS S3-optimized committer

Amazon EMR has its own S3-aware committers for parquet data.
For instructions on use, see
[the EMRFS S3-optimized committer](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)

For implementation and performanc details, see
["Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer"](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/


### Azure and Google cloud storage: MapReduce Intermediate Manifest Committer.

Versions of the hadoop-mapreduce-core JAR shipped after September 2022 (3.3.5 and later)
contain a committer optimized for performance and resilience on
Azure ADLS Generation 2 and Google Cloud Storage.

This committer, the "manifest committer" uses a manifest file to propagate
directory listing information from the task committers to the job committer.
These manifests can be written atomically, without relying on atomic directory rename,
something GCS lacks.

The job commmitter reads these manifests and will rename files from the task output
directories directly into the destination directory, in parallel, with optional
rate limiting to avoid throttling IO.
This deliviers performance and scalability on the object stores.

It is not critical for job correctness to use this with Azure storage; the
classic FileOutputCommitter is safe there -however this new committer scales
better for large jobs with deep and wide directory trees.

Because Google GCS does not support atomic directory renaming,
the manifest committer should be used where available.

This committer does support "dynamic partition overwrite" (see below).

For details on availability and use of this committer, consult
the hadoop documentation for the Hadoop release used.

It is not available on Hadoop 3.3.4 or earlier.

### IBM Cloud Object Storage: Stocator

IBM provide the Stocator output committer for IBM Cloud Object Storage and OpenStack Swift.

Source, documentation and releasea can be found at
[https://github.com/CODAIT/stocator](Stocator - Storage Connector for Apache Spark).


## Cloud Committers and `INSERT OVERWRITE TABLE`

Spark has a feature called "dynamic partition overwrite"; a table can be updated and only those
partitions into which new data is added will have their contents replaced.

This is used in SQL statements of the form `INSERT OVERWRITE TABLE`,
and when Datasets are written in mode "overwrite"

{% highlight scala %}
eventDataset.write
.mode("overwrite")
.partitionBy("year", "month")
.format("parquet")
.save(tablePath)
{% endhighlight %}

This feature uses file renaming and has specific requirements of
both the committer and the filesystem:

1. The committer's working directory must be in the destination filesystem.
2. The target filesystem must support file rename efficiently.

These conditions are _not_ met by the S3A committers and AWS S3 storage.

Committers for other cloud stores _may_ support this feature, and
declare to spark that they are compatible. If dynamic partition overwrite
is required when writing data through a hadoop committer, Spark
will always permit this when the original `FileOutputCommitter`
is used. For other committers, after their instantiation, Spark
will probe for their declaration of compatibility, and
permit the operation if state that they are compatible.

If the committer is not compatible, the operation will fail with
the error message
`PathOutputCommitter does not support dynamicPartitionOverwrite`

Unless there is a compatible committer for the target filesystem,
the sole solution is to use a cloud-friendly format for data
storage.

## Further Reading

Here is the documentation on the standard connectors both from Apache and the cloud providers.

* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
* [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-azure/index.html).
* [Azure Blob Filesystem (ABFS) and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html).
* [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
* [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/)
* [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon.
* [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)
* [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google.
* [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver)
* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator),
[IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM.
* [Using JindoFS SDK to access Alibaba Cloud OSS](https://github.com/aliyun/alibabacloud-jindofs).

The Cloud Committer problem and hive-compatible solutions
* [Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html)
* [Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/)
* [The Manifest Committer for Azure and Google Cloud Storage](https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md)
* [A Zero-rename committer](https://github.com/steveloughran/zero-rename-committer/releases/).
* [Stocator: A High Performance Object Store Connector for Spark](http://arxiv.org/abs/1709.01812)

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud

import java.io.IOException

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter}
import org.apache.parquet.hadoop.ParquetOutputCommitter
Expand All @@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging
class BindingParquetOutputCommitter(
path: Path,
context: TaskAttemptContext)
extends ParquetOutputCommitter(path, context) with Logging {
extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities {

logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path")

Expand Down Expand Up @@ -119,4 +119,8 @@ class BindingParquetOutputCommitter(
}

override def toString: String = s"BindingParquetOutputCommitter($committer)"

override def hasCapability(capability: String): Boolean =
committer.isInstanceOf[StreamCapabilities] &&
committer.asInstanceOf[StreamCapabilities].hasCapability(capability)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud

import java.io.IOException

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}

Expand All @@ -38,27 +38,28 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
* In `setupCommitter` the factory is identified and instantiated;
* this factory then creates the actual committer implementation.
*
* @constructor Instantiate. dynamic partition overwrite is not supported,
* so that committers for stores which do not support rename
* will not get confused.
* Dynamic Partition support will be determined once the committer is
* instantiated in the setupJob/setupTask methods. If this
* class was instantiated with `dynamicPartitionOverwrite` set to true,
* then the instantiated committer must either be an instance of
* `FileOutputCommitter` or it must implement the `StreamCapabilities`
* interface and declare that it has the capability
* `mapreduce.job.committer.dynamic.partitioning`.
* That feature is available on Hadoop releases with the Intermediate
* Manifest Committer for GCS and ABFS; it is not supported by the
* S3A committers.
* @constructor Instantiate.
* @param jobId job
* @param dest destination
* @param dynamicPartitionOverwrite does the caller want support for dynamic
* partition overwrite. If so, it will be
* refused.
* partition overwrite?
*/
class PathOutputCommitProtocol(
jobId: String,
dest: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {

if (dynamicPartitionOverwrite) {
// until there's explicit extensions to the PathOutputCommitProtocols
// to support the spark mechanism, it's left to the individual committer
// choice to handle partitioning.
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
}
extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite)
with Serializable {

/** The committer created. */
@transient private var committer: PathOutputCommitter = _
Expand Down Expand Up @@ -114,10 +115,33 @@ class PathOutputCommitProtocol(
// failures. Warn
logTrace(s"Committer $committer may not be tolerant of task commit failures")
}
} else {
// if required other committers need to be checked for dynamic partition
// compatibility through a StreamCapabilities probe.
if (dynamicPartitionOverwrite) {
if (supportsDynamicPartitions) {
logDebug(
s"Committer $committer has declared compatibility with dynamic partition overwrite")
} else {
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
}
}
}
committer
}


/**
* Does the instantiated committer support dynamic partitions?
* @return true if the committer declares itself compatible.
*/
private def supportsDynamicPartitions = {
committer.isInstanceOf[FileOutputCommitter] ||
(committer.isInstanceOf[StreamCapabilities] &&
committer.asInstanceOf[StreamCapabilities]
.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING))
}

/**
* Create a temporary file for a task.
*
Expand All @@ -140,6 +164,28 @@ class PathOutputCommitProtocol(
file.toString
}

/**
* Reject any requests for an absolute path file on a committer which
* is not compatible with it.
*
* @param taskContext task context
* @param absoluteDir final directory
* @param spec output filename
* @return a path string
* @throws UnsupportedOperationException if incompatible
*/
override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
spec: FileNameSpec): String = {

if (supportsDynamicPartitions) {
super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
} else {
throw new UnsupportedOperationException(s"Absolute output locations not supported" +
s" by committer $committer")
}
}
}

object PathOutputCommitProtocol {
Expand All @@ -161,7 +207,17 @@ object PathOutputCommitProtocol {
val REJECT_FILE_OUTPUT_DEFVAL = false

/** Error string for tests. */
private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" +
private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" +
" dynamicPartitionOverwrite"

/**
* Stream Capabilities probe for spark dynamic partitioning compatibility.
*/
private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING =
"mapreduce.job.committer.dynamic.partitioning"

/**
* Scheme prefix for per-filesystem scheme committers.
*/
private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"
}
Loading

0 comments on commit 5a599de

Please sign in to comment.