Skip to content

Commit

Permalink
[SPARK-43327][CORE][3.3] Trigger committer.setupJob before plan exe…
Browse files Browse the repository at this point in the history
…cute in `FileFormatWriter#write`

### What changes were proposed in this pull request?

Trigger `committer.setupJob` before plan execute in `FileFormatWriter#write`

### Why are the changes needed?

In this issue, the case where `outputOrdering` might not work if AQE is enabled has been resolved.

apache#38358

However, since it materializes the AQE plan in advance (triggers getFinalPhysicalPlan) , it may cause the committer.setupJob(job) to not execute When `AdaptiveSparkPlanExec#getFinalPhysicalPlan()` is executed with an error.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add UT

Closes apache#41154 from zzzzming95/spark3-SPARK-43327.

Lead-authored-by: zzzzming95 <505306252@qq.com>
Co-authored-by: zhiming she <505306252@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
zzzzming95 authored and cloud-fan committed Aug 22, 2023
1 parent 69ca57c commit 557f9f5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ object FileFormatWriter extends Logging {
statsTrackers = statsTrackers
)

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering =
partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
Expand All @@ -208,16 +218,6 @@ object FileFormatWriter extends Logging {
}
}

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
committer.setupJob(job)

try {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
(materializedPlan.execute(), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkContext, TestUtils}
import org.apache.spark.{SparkContext, SparkException, TestUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
Expand Down Expand Up @@ -1275,4 +1275,26 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
}
}
}

test("SPARK-43327: location exists when insertoverwrite fails") {
withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t", "t1") {
sql("CREATE TABLE t(c1 int) USING parquet")
sql("CREATE TABLE t1(c2 long) USING parquet")
sql("INSERT OVERWRITE TABLE t1 SELECT 6000044164")

val identifier = TableIdentifier("t")
val location = spark.sessionState.catalog.getTableMetadata(identifier).location

intercept[SparkException] {
sql("INSERT OVERWRITE TABLE t SELECT c2 FROM " +
"(SELECT cast(c2 as int) as c2 FROM t1 distribute by c2)")
}
// scalastyle:off hadoopconfiguration
val fs = FileSystem.get(location, spark.sparkContext.hadoopConfiguration)
// scalastyle:on hadoopconfiguration
assert(fs.exists(new Path(location)))
}
}
}
}

0 comments on commit 557f9f5

Please sign in to comment.