Skip to content

[SPARK-41407][SQL] Pull out v1 write to WriteFiles #38939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3674,7 +3674,7 @@
},
"_LEGACY_ERROR_TEMP_2054" : {
"message" : [
"Task failed while writing rows."
"Task failed while writing rows. <message>"
]
},
"_LEGACY_ERROR_TEMP_2055" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2054",
messageParameters = Map.empty,
messageParameters = Map("message" -> cause.getMessage),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan after we introduce Writefiles, the error stack for writing changes.

before:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15) (10.221.97.76 executor driver): java.lang.RuntimeException: Exceeds char/varchar type length limitation: 5
	at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:30)
	at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

after:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15) (10.221.97.76 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:416)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:89)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1502)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Exceeds char/varchar type length limitation: 5
	at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:30)
	at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:43)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

So I added the root cause mesagge into the wrapped SparkException.

cause = cause)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.internal;

import java.io.Serializable;

/**
* Write spec is a input parameter of
* {@link org.apache.spark.sql.execution.SparkPlan#executeWrite}.
*
* <p>
* This is an empty interface, the concrete class which implements
* {@link org.apache.spark.sql.execution.SparkPlan#doExecuteWrite}
* should define its own class and use it.
*
* @since 3.4.0
*/
public interface WriteSpec extends Serializable {}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TreeNodeTag, UnaryLike}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, WriteSpec}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.NextIterator
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
Expand Down Expand Up @@ -223,6 +224,19 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
doExecuteColumnar()
}

/**
* Returns the result of writes as an RDD[WriterCommitMessage] variable by delegating to
* `doExecuteWrite` after preparations.
*
* Concrete implementations of SparkPlan should override `doExecuteWrite`.
*/
def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = executeQuery {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regardless of how hard to implement, ideally which information should be in the WriteFiles operator and which should be passed as parameters?

Copy link
Contributor Author

@ulysses-you ulysses-you Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me list the required things of current v1 write files:

  • WriteJobDescription, includes hadoop job (hadoop conf), fileFormat, outputSpec, partitionColumns, bucketSpec, options, statsTrackers
  • FileCommitProtocol, includes output path, dynamic partition overwrite flag
  • ConcurrentOutputWriterSpec, includes requiredOrdering, bucketSpec, physical sortPlan

According to the existed datasource v1 writes command. WriteFiles should hold at least: FileFormat, OutputSpec, partitionColumns, bucketSpec, options, requiredOrdering.

case class InsertIntoHadoopFsRelationCommand(
    outputPath: Path,
    staticPartitions: TablePartitionSpec,
    ifPartitionNotExists: Boolean,
    partitionColumns: Seq[Attribute],
    bucketSpec: Option[BucketSpec],
    fileFormat: FileFormat,
    options: Map[String, String],
    query: LogicalPlan,
    mode: SaveMode,
    catalogTable: Option[CatalogTable],
    fileIndex: Option[FileIndex],
    outputColumnNames: Seq[String])

Due to we can not get physical plan at logical side, and ConcurrentOutputWriterSpec depend on physical. It should be held in WriteFilesSpec.

FileCommitProtocol should be held in WriteFilesSpec, because WriteFiles only do the partial work about task due to the pipeline setup job -> setup task -> commit task -> commit job.
And the same reason for statsTrackers.

According to the usage of hadoop job (FileCommitProtocol.setup(Job)), I tend to make WriteFilesSpec hold hadoop job and hadoop conf.


In sum:

WriteFiles: FileFormat, OutputSpec, partitionColumns, bucketSpec, options and requiredOrdering.
WriteFilesSpec: FileCommitProtocol, statsTrackers, ConcurrentOutputWriterSpec, hadoop job and hadoop conf.

Notes: the aboved does not consider how hard to implement, just based on semantic level.

if (isCanonicalizedPlan) {
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
}
doExecuteWrite(writeSpec)
}

/**
* Executes a query after preparing the query and adding query plan information to created RDDs
* for visualization.
Expand Down Expand Up @@ -324,6 +338,16 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
s" mismatch:\n${this}")
}

/**
* Produces the result of the writes as an `RDD[WriterCommitMessage]`
*
* Overridden by concrete implementations of SparkPlan.
*/
protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = {
throw SparkException.internalError(s"Internal Error ${this.getClass} has write support" +
s" mismatch:\n${this}")
}

/**
* Converts the output of this plan to row-based if it is columnar plan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors
import org.apache.spark.sql.execution.aggregate.AggUtils
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{WriteFiles, WriteFilesExec}
import org.apache.spark.sql.execution.exchange.{REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, REPARTITION_BY_NUM, ShuffleExchangeExec}
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.streaming._
Expand Down Expand Up @@ -894,6 +895,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO TABLE")
case logical.CollectMetrics(name, metrics, child) =>
execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil
case WriteFiles(child) =>
WriteFilesExec(planLater(child)) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ case class CreateDataSourceTableAsSelectCommand(
outputColumnNames: Seq[String])
extends V1WriteCommand {

override def fileFormatProvider: Boolean = {
table.provider.forall { provider =>
classOf[FileFormat].isAssignableFrom(DataSource.providingClass(provider, conf))
Copy link
Contributor Author

@ulysses-you ulysses-you Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateDataSourceTableAsSelectCommand is not only used to write files, we should only plan v1 writes whose provider is FileFormat

}
}

override lazy val partitionColumns: Seq[Attribute] = {
val unresolvedPartitionColumns = table.partitionColumnNames.map(UnresolvedAttribute.quoted)
DataSource.resolvePartitionColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,8 @@ case class DataSource(

case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])

lazy val providingClass: Class[_] = {
val cls = DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
// `providingClass` is used for resolving data source relation for catalog tables.
// As now catalog for data source V2 is under development, here we fall back all the
// [[FileDataSourceV2]] to [[FileFormat]] to guarantee the current catalog works.
// [[FileDataSourceV2]] will still be used if we call the load()/save() method in
// [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource`
// instead of `providingClass`.
cls.newInstance() match {
case f: FileDataSourceV2 => f.fallbackFileFormat
case _ => cls
}
}
lazy val providingClass: Class[_] =
DataSource.providingClass(className, sparkSession.sessionState.conf)

private[sql] def providingInstance(): Any = providingClass.getConstructor().newInstance()

Expand Down Expand Up @@ -843,4 +832,18 @@ object DataSource extends Logging {
}
}
}

def providingClass(className: String, conf: SQLConf): Class[_] = {
val cls = DataSource.lookupDataSource(className, conf)
// `providingClass` is used for resolving data source relation for catalog tables.
// As now catalog for data source V2 is under development, here we fall back all the
// [[FileDataSourceV2]] to [[FileFormat]] to guarantee the current catalog works.
// [[FileDataSourceV2]] will still be used if we call the load()/save() method in
// [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource`
// instead of `providingClass`.
cls.newInstance() match {
case f: FileDataSourceV2 => f.fallbackFileFormat
case _ => cls
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -103,14 +104,6 @@ object FileFormatWriter extends Logging {
.map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation))
val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains)

val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions))
val empty2NullPlan = if (hasEmpty2Null) {
plan
} else {
val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan
}

val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, dataColumns, options)
val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)

Expand Down Expand Up @@ -144,9 +137,10 @@ object FileFormatWriter extends Logging {
// columns.
val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan)
// the sort order doesn't matter
// Use the output ordering from the original plan before adding the empty2null projection.
val actualOrdering = plan.outputOrdering.map(_.child)
val actualOrdering = writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child)
val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering)

SQLExecution.checkSQLExecutionId(sparkSession)
Expand All @@ -155,10 +149,6 @@ object FileFormatWriter extends Logging {
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

// When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will add logical sort
// operator based on the required ordering of the V1 write command. So the output
// ordering of the physical plan should always match the required ordering. Here
Expand All @@ -169,27 +159,55 @@ object FileFormatWriter extends Logging {
// V1 write command will be empty).
if (Utils.isTesting) outputOrderingMatched = orderingMatched

try {
if (writeFilesOpt.isDefined) {
// build `WriteFilesSpec` for `WriteFiles`
val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {
val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)
createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)
}
val writeSpec = WriteFilesSpec(
description = description,
committer = committer,
concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc
)
executeWrite(sparkSession, plan, writeSpec, job)
} else {
executeWrite(sparkSession, plan, job, description, committer, outputSpec,
requiredOrdering, partitionColumns, sortColumns, orderingMatched)
}
}
// scalastyle:on argcount

private def executeWrite(
sparkSession: SparkSession,
plan: SparkPlan,
job: Job,
description: WriteJobDescription,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
requiredOrdering: Seq[Expression],
partitionColumns: Seq[Attribute],
sortColumns: Seq[Attribute],
orderingMatched: Boolean): Set[String] = {
val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions))
val empty2NullPlan = if (hasEmpty2Null) {
plan
} else {
val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan
}

writeAndCommit(job, description, committer) {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
(empty2NullPlan.execute(), None)
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = bindReferences(
requiredOrdering.map(SortOrder(_, Ascending)), finalOutputSpec.outputColumns)
val sortPlan = SortExec(
orderingExpr,
global = false,
child = empty2NullPlan)

val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters
val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty
if (concurrentWritersEnabled) {
(empty2NullPlan.execute(),
Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())))
val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, outputSpec)
val concurrentOutputWriterSpec = createConcurrentOutputWriterSpec(
sparkSession, sortPlan, sortColumns)
if (concurrentOutputWriterSpec.isDefined) {
(empty2NullPlan.execute(), concurrentOutputWriterSpec)
} else {
(sortPlan.execute(), None)
(sortPlan.execute(), concurrentOutputWriterSpec)
}
}

Expand Down Expand Up @@ -221,7 +239,19 @@ object FileFormatWriter extends Logging {
committer.onTaskCommit(res.commitMsg)
ret(index) = res
})
ret
}
}

private def writeAndCommit(
job: Job,
description: WriteJobDescription,
committer: FileCommitProtocol)(f: => Array[WriteTaskResult]): Set[String] = {
// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)
try {
val ret = f
val commitMsgs = ret.map(_.commitMsg)

logInfo(s"Start to commit write Job ${description.uuid}.")
Expand All @@ -239,10 +269,70 @@ object FileFormatWriter extends Logging {
throw cause
}
}
// scalastyle:on argcount

/**
* Write files using [[SparkPlan.executeWrite]]
*/
private def executeWrite(
session: SparkSession,
planForWrites: SparkPlan,
writeFilesSpec: WriteFilesSpec,
job: Job): Set[String] = {
val committer = writeFilesSpec.committer
val description = writeFilesSpec.description

writeAndCommit(job, description, committer) {
val rdd = planForWrites.executeWrite(writeFilesSpec)
val ret = new Array[WriteTaskResult](rdd.partitions.length)
session.sparkContext.runJob(
rdd,
(context: TaskContext, iter: Iterator[WriterCommitMessage]) => {
assert(iter.hasNext)
val commitMessage = iter.next()
assert(!iter.hasNext)
commitMessage
},
rdd.partitions.indices,
(index, res: WriterCommitMessage) => {
assert(res.isInstanceOf[WriteTaskResult])
val writeTaskResult = res.asInstanceOf[WriteTaskResult]
committer.onTaskCommit(writeTaskResult.commitMsg)
ret(index) = writeTaskResult
})
ret
}
}

private def createSortPlan(
plan: SparkPlan,
requiredOrdering: Seq[Expression],
outputSpec: OutputSpec): SortExec = {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = bindReferences(
requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns)
SortExec(
orderingExpr,
global = false,
child = plan)
}

private def createConcurrentOutputWriterSpec(
sparkSession: SparkSession,
sortPlan: SortExec,
sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] = {
val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters
val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty
if (concurrentWritersEnabled) {
Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter()))
} else {
None
}
}

/** Writes data out in a single Spark task. */
private def executeTask(
private[spark] def executeTask(
description: WriteJobDescription,
jobIdInstant: Long,
sparkStageId: Int,
Expand Down
Loading