Skip to content

[SPARK-41407][SQL] Pull out v1 write to WriteFiles #38939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Dec 6, 2022

What changes were proposed in this pull request?

This pr aims to pull out the details of v1 write files to a new operator WriteFiles(logical) WriteFilesExec(physical). Then we can make v1 write files support whole stage codegen in future.

Introduce WriteFilesSpec to hold all v1 write files information:

case class WriteFilesSpec(
    description: WriteJobDescription,
    committer: FileCommitProtocol,
    concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
  extends WriteSpec

In order to compatiable with existed code path, this pr adds a new method executeWrite in SparkPlan:

def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage]

Refactor FileFormatWriter to make write files clearly:

  • execute write using old code path
  • execute write using SparkPlan.executeWrite
  • extract writeAndCommit method to work with both two code path

Why are the changes needed?

This is the preparation work before support v1 write whole stage codegen.

Does this PR introduce any user-facing change?

for user, no

for developer, yes:

  • add a new method executeWrite in SparkPlan
  • add a new interface WriteSpec

How was this patch tested?

pass CI with spark.sql.optimizer.plannedWrite.enabled on/off

@github-actions github-actions bot added the SQL label Dec 6, 2022
@ulysses-you ulysses-you force-pushed the v1write-plan branch 3 times, most recently from 1afe010 to 80eb7f4 Compare December 7, 2022 09:22
@ulysses-you ulysses-you changed the title [WIP][SPARK-41407][SQL] Pull out v1 write to WriteFiles [SPARK-41407][SQL] Pull out v1 write to WriteFiles Dec 7, 2022
*
* @since 3.4.0
*/
public interface WriteSpec extends Serializable {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be a public DS v2 API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is actually not for v2. The new added method has two interfaces:

`def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage]`

I just make WriteSpec as a java interface to be consistent with WriterCommitMessage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then this is in the wrong package. it should be put in an internal package

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to internal package

* [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
*/
case class WriteFiles(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output should match physical plan and be Nil as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be child.output, otherwise DataWritingCommand can not work. WriteFiles is the child of DataWritingCommand and DataWritingCommand use its child output as the final output at planner phase.

*
* Concrete implementations of SparkPlan should override `doExecuteWrite`.
*/
def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = executeQuery {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regardless of how hard to implement, ideally which information should be in the WriteFiles operator and which should be passed as parameters?

Copy link
Contributor Author

@ulysses-you ulysses-you Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me list the required things of current v1 write files:

  • WriteJobDescription, includes hadoop job (hadoop conf), fileFormat, outputSpec, partitionColumns, bucketSpec, options, statsTrackers
  • FileCommitProtocol, includes output path, dynamic partition overwrite flag
  • ConcurrentOutputWriterSpec, includes requiredOrdering, bucketSpec, physical sortPlan

According to the existed datasource v1 writes command. WriteFiles should hold at least: FileFormat, OutputSpec, partitionColumns, bucketSpec, options, requiredOrdering.

case class InsertIntoHadoopFsRelationCommand(
    outputPath: Path,
    staticPartitions: TablePartitionSpec,
    ifPartitionNotExists: Boolean,
    partitionColumns: Seq[Attribute],
    bucketSpec: Option[BucketSpec],
    fileFormat: FileFormat,
    options: Map[String, String],
    query: LogicalPlan,
    mode: SaveMode,
    catalogTable: Option[CatalogTable],
    fileIndex: Option[FileIndex],
    outputColumnNames: Seq[String])

Due to we can not get physical plan at logical side, and ConcurrentOutputWriterSpec depend on physical. It should be held in WriteFilesSpec.

FileCommitProtocol should be held in WriteFilesSpec, because WriteFiles only do the partial work about task due to the pipeline setup job -> setup task -> commit task -> commit job.
And the same reason for statsTrackers.

According to the usage of hadoop job (FileCommitProtocol.setup(Job)), I tend to make WriteFilesSpec hold hadoop job and hadoop conf.


In sum:

WriteFiles: FileFormat, OutputSpec, partitionColumns, bucketSpec, options and requiredOrdering.
WriteFilesSpec: FileCommitProtocol, statsTrackers, ConcurrentOutputWriterSpec, hadoop job and hadoop conf.

Notes: the aboved does not consider how hard to implement, just based on semantic level.

session.sparkContext.runJob(
rdd,
(context: TaskContext, iter: Iterator[WriterCommitMessage]) => {
assert(iter.hasNext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make sure this iterator only have one element

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

@github-actions github-actions bot added the CORE label Dec 8, 2022
@@ -785,7 +785,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2054",
messageParameters = Map.empty,
messageParameters = Map("message" -> cause.getMessage),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan after we introduce Writefiles, the error stack for writing changes.

before:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15) (10.221.97.76 executor driver): java.lang.RuntimeException: Exceeds char/varchar type length limitation: 5
	at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:30)
	at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

after:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15) (10.221.97.76 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:416)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:89)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1502)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Exceeds char/varchar type length limitation: 5
	at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:30)
	at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

So I added the root cause mesagge into the wrapped SparkException.

@@ -145,6 +145,12 @@ case class CreateDataSourceTableAsSelectCommand(
outputColumnNames: Seq[String])
extends V1WriteCommand {

override def fileFormatProvider: Boolean = {
table.provider.forall { provider =>
classOf[FileFormat].isAssignableFrom(DataSource.providingClass(provider, conf))
Copy link
Contributor Author

@ulysses-you ulysses-you Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateDataSourceTableAsSelectCommand is not only used to write files, we should only plan v1 writes whose provider is FileFormat

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 2ffa817 Dec 23, 2022
HyukjinKwon added a commit that referenced this pull request Dec 23, 2022
…riter.executeTask

### What changes were proposed in this pull request?

This PR is a followup of #38939 that fixes a logical conflict during merging PRs, see #38980 and #38939.

### Why are the changes needed?

To recover the broken build.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually tested:

```
 ./build/sbt -Phive clean package
```

Closes #39194 from HyukjinKwon/SPARK-41407.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@ulysses-you ulysses-you deleted the v1write-plan branch December 26, 2022 05:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants