-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-41407][SQL] Pull out v1 write to WriteFiles #38939
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1afe010
to
80eb7f4
Compare
80eb7f4
to
c77fe28
Compare
* | ||
* @since 3.4.0 | ||
*/ | ||
public interface WriteSpec extends Serializable {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it need to be a public DS v2 API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is actually not for v2. The new added method has two interfaces:
`def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage]`
I just make WriteSpec
as a java interface to be consistent with WriterCommitMessage
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then this is in the wrong package. it should be put in an internal package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to internal package
* [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]]. | ||
*/ | ||
case class WriteFiles(child: LogicalPlan) extends UnaryNode { | ||
override def output: Seq[Attribute] = child.output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the output should match physical plan and be Nil as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be child.output
, otherwise DataWritingCommand
can not work. WriteFiles
is the child of DataWritingCommand
and DataWritingCommand
use its child output as the final output at planner phase.
* | ||
* Concrete implementations of SparkPlan should override `doExecuteWrite`. | ||
*/ | ||
def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = executeQuery { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regardless of how hard to implement, ideally which information should be in the WriteFiles operator and which should be passed as parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me list the required things of current v1 write files:
- WriteJobDescription, includes hadoop job (hadoop conf), fileFormat, outputSpec, partitionColumns, bucketSpec, options, statsTrackers
- FileCommitProtocol, includes output path, dynamic partition overwrite flag
- ConcurrentOutputWriterSpec, includes requiredOrdering, bucketSpec, physical sortPlan
According to the existed datasource v1 writes command. WriteFiles
should hold at least: FileFormat, OutputSpec, partitionColumns, bucketSpec, options, requiredOrdering.
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
staticPartitions: TablePartitionSpec,
ifPartitionNotExists: Boolean,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String],
query: LogicalPlan,
mode: SaveMode,
catalogTable: Option[CatalogTable],
fileIndex: Option[FileIndex],
outputColumnNames: Seq[String])
Due to we can not get physical plan at logical side, and ConcurrentOutputWriterSpec depend on physical. It should be held in WriteFilesSpec
.
FileCommitProtocol should be held in WriteFilesSpec
, because WriteFiles
only do the partial work about task due to the pipeline setup job -> setup task -> commit task -> commit job
.
And the same reason for statsTrackers.
According to the usage of hadoop job (FileCommitProtocol.setup(Job)), I tend to make WriteFilesSpec
hold hadoop job and hadoop conf.
In sum:
WriteFiles
: FileFormat, OutputSpec, partitionColumns, bucketSpec, options and requiredOrdering.
WriteFilesSpec
: FileCommitProtocol, statsTrackers, ConcurrentOutputWriterSpec, hadoop job and hadoop conf.
Notes: the aboved does not consider how hard to implement, just based on semantic level.
session.sparkContext.runJob( | ||
rdd, | ||
(context: TaskContext, iter: Iterator[WriterCommitMessage]) => { | ||
assert(iter.hasNext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make sure this iterator only have one element
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
@@ -785,7 +785,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { | |||
def taskFailedWhileWritingRowsError(cause: Throwable): Throwable = { | |||
new SparkException( | |||
errorClass = "_LEGACY_ERROR_TEMP_2054", | |||
messageParameters = Map.empty, | |||
messageParameters = Map("message" -> cause.getMessage), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan after we introduce Writefiles, the error stack for writing changes.
before:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15) (10.221.97.76 executor driver): java.lang.RuntimeException: Exceeds char/varchar type length limitation: 5
at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:30)
at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:43)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
after:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15) (10.221.97.76 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:416)
at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:89)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1502)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Exceeds char/varchar type length limitation: 5
at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.trimTrailingSpaces(CharVarcharCodegenUtils.java:30)
at org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils.charTypeWriteSideCheck(CharVarcharCodegenUtils.java:43)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
So I added the root cause mesagge into the wrapped SparkException
.
@@ -145,6 +145,12 @@ case class CreateDataSourceTableAsSelectCommand( | |||
outputColumnNames: Seq[String]) | |||
extends V1WriteCommand { | |||
|
|||
override def fileFormatProvider: Boolean = { | |||
table.provider.forall { provider => | |||
classOf[FileFormat].isAssignableFrom(DataSource.providingClass(provider, conf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateDataSourceTableAsSelectCommand
is not only used to write files, we should only plan v1 writes whose provider is FileFormat
thanks, merging to master! |
…riter.executeTask ### What changes were proposed in this pull request? This PR is a followup of #38939 that fixes a logical conflict during merging PRs, see #38980 and #38939. ### Why are the changes needed? To recover the broken build. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Manually tested: ``` ./build/sbt -Phive clean package ``` Closes #39194 from HyukjinKwon/SPARK-41407. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This pr aims to pull out the details of v1 write files to a new operator
WriteFiles
(logical)WriteFilesExec
(physical). Then we can make v1 write files support whole stage codegen in future.Introduce
WriteFilesSpec
to hold all v1 write files information:In order to compatiable with existed code path, this pr adds a new method
executeWrite
inSparkPlan
:Refactor
FileFormatWriter
to make write files clearly:SparkPlan.executeWrite
writeAndCommit
method to work with both two code pathWhy are the changes needed?
This is the preparation work before support v1 write whole stage codegen.
Does this PR introduce any user-facing change?
for user, no
for developer, yes:
executeWrite
inSparkPlan
WriteSpec
How was this patch tested?
pass CI with spark.sql.optimizer.plannedWrite.enabled on/off