Skip to content

Commit 27e20fe

Browse files
ulysses-youcloud-fan
authored andcommitted
[SPARK-41708][SQL] Pull v1write information to WriteFiles
### What changes were proposed in this pull request? This pr aims to pull out the v1write information from `V1WriteCommand` to `WriteFiles`: ```scala case class WriteFiles(child: LogicalPlan) => case class WriteFiles( child: LogicalPlan, fileFormat: FileFormat, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], options: Map[String, String], staticPartitions: TablePartitionSpec) ``` Also, this pr do a cleanup for `WriteSpec` which is unnecessary. ### Why are the changes needed? After this pr, `WriteFiles` will hold write information that can help developers ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Pass CI Closes #39277 from ulysses-you/SPARK-41708. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 89666d4 commit 27e20fe

File tree

12 files changed

+355
-308
lines changed

12 files changed

+355
-308
lines changed

sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.plans.physical._
3636
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TreeNodeTag, UnaryLike}
3737
import org.apache.spark.sql.connector.write.WriterCommitMessage
3838
import org.apache.spark.sql.errors.QueryExecutionErrors
39+
import org.apache.spark.sql.execution.datasources.WriteFilesSpec
3940
import org.apache.spark.sql.execution.metric.SQLMetric
40-
import org.apache.spark.sql.internal.{SQLConf, WriteSpec}
41+
import org.apache.spark.sql.internal.SQLConf
4142
import org.apache.spark.sql.vectorized.ColumnarBatch
4243
import org.apache.spark.util.NextIterator
4344
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
@@ -230,11 +231,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
230231
*
231232
* Concrete implementations of SparkPlan should override `doExecuteWrite`.
232233
*/
233-
def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = executeQuery {
234+
def executeWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = executeQuery {
234235
if (isCanonicalizedPlan) {
235236
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
236237
}
237-
doExecuteWrite(writeSpec)
238+
doExecuteWrite(writeFilesSpec)
238239
}
239240

240241
/**
@@ -343,7 +344,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
343344
*
344345
* Overridden by concrete implementations of SparkPlan.
345346
*/
346-
protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = {
347+
protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
347348
throw SparkException.internalError(s"Internal Error ${this.getClass} has write support" +
348349
s" mismatch:\n${this}")
349350
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -895,8 +895,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
895895
throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO TABLE")
896896
case logical.CollectMetrics(name, metrics, child) =>
897897
execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil
898-
case WriteFiles(child) =>
899-
WriteFilesExec(planLater(child)) :: Nil
898+
case WriteFiles(child, fileFormat, partitionColumns, bucket, options, staticPartitions) =>
899+
WriteFilesExec(planLater(child), fileFormat, partitionColumns, bucket, options,
900+
staticPartitions) :: Nil
900901
case _ => Nil
901902
}
902903
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
1919

2020
import org.apache.spark.sql.catalyst.SQLConfHelper
2121
import org.apache.spark.sql.catalyst.catalog.BucketSpec
22+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2223
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression}
2324
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
2425
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
@@ -31,11 +32,31 @@ import org.apache.spark.sql.types.StringType
3132
import org.apache.spark.unsafe.types.UTF8String
3233

3334
trait V1WriteCommand extends DataWritingCommand {
35+
/**
36+
* Specify the [[FileFormat]] of the provider of V1 write command.
37+
*/
38+
def fileFormat: FileFormat
39+
3440
/**
3541
* Specify the partition columns of the V1 write command.
3642
*/
3743
def partitionColumns: Seq[Attribute]
3844

45+
/**
46+
* Specify the partition spec of the V1 write command.
47+
*/
48+
def staticPartitions: TablePartitionSpec
49+
50+
/**
51+
* Specify the bucket spec of the V1 write command.
52+
*/
53+
def bucketSpec: Option[BucketSpec]
54+
55+
/**
56+
* Specify the storage options of the V1 write command.
57+
*/
58+
def options: Map[String, String]
59+
3960
/**
4061
* Specify the required ordering for the V1 write command. `FileFormatWriter` will
4162
* add SortExec if necessary when the requiredOrdering is empty.
@@ -56,7 +77,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
5677
case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>
5778
val newQuery = prepareQuery(write, write.query)
5879
val attrMap = AttributeMap(write.query.output.zip(newQuery.output))
59-
val newChild = WriteFiles(newQuery)
80+
val newChild = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,
81+
write.bucketSpec, write.options, write.staticPartitions)
6082
val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {
6183
case a: Attribute if attrMap.contains(a) =>
6284
a.withExprId(attrMap(a).exprId)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ import org.apache.spark.{SparkException, TaskContext}
2323
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
27+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2628
import org.apache.spark.sql.catalyst.expressions.Attribute
2729
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
2830
import org.apache.spark.sql.connector.write.WriterCommitMessage
2931
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
3032
import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec
31-
import org.apache.spark.sql.internal.WriteSpec
3233

3334
/**
3435
* The write files spec holds all information of [[V1WriteCommand]] if its provider is
@@ -38,13 +39,18 @@ case class WriteFilesSpec(
3839
description: WriteJobDescription,
3940
committer: FileCommitProtocol,
4041
concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
41-
extends WriteSpec
4242

4343
/**
4444
* During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between [[V1WriteCommand]] and query.
4545
* [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
4646
*/
47-
case class WriteFiles(child: LogicalPlan) extends UnaryNode {
47+
case class WriteFiles(
48+
child: LogicalPlan,
49+
fileFormat: FileFormat,
50+
partitionColumns: Seq[Attribute],
51+
bucketSpec: Option[BucketSpec],
52+
options: Map[String, String],
53+
staticPartitions: TablePartitionSpec) extends UnaryNode {
4854
override def output: Seq[Attribute] = child.output
4955
override protected def withNewChildInternal(newChild: LogicalPlan): WriteFiles =
5056
copy(child = newChild)
@@ -53,13 +59,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode {
5359
/**
5460
* Responsible for writing files.
5561
*/
56-
case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode {
62+
case class WriteFilesExec(
63+
child: SparkPlan,
64+
fileFormat: FileFormat,
65+
partitionColumns: Seq[Attribute],
66+
bucketSpec: Option[BucketSpec],
67+
options: Map[String, String],
68+
staticPartitions: TablePartitionSpec) extends UnaryExecNode {
5769
override def output: Seq[Attribute] = Seq.empty
5870

59-
override protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = {
60-
assert(writeSpec.isInstanceOf[WriteFilesSpec])
61-
val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec]
62-
71+
override protected def doExecuteWrite(
72+
writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
6373
val rdd = child.execute()
6474
// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
6575
// partition rdd to make sure we at least set up one write task to write the metadata.

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,8 @@ case class RelationConversions(
238238
// that only matches table insertion inside Hive CTAS.
239239
// This pattern would not cause conflicts because this rule is always applied before
240240
// `HiveAnalysis` and both of these rules are running once.
241-
case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _)
241+
case InsertIntoHiveTable(
242+
tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _, _)
242243
if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
243244
tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) &&
244245
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.execution
19+
20+
import java.io.IOException
21+
import java.net.URI
22+
import java.text.SimpleDateFormat
23+
import java.util.{Date, Locale, Random}
24+
25+
import scala.util.control.NonFatal
26+
27+
import org.apache.hadoop.conf.Configuration
28+
import org.apache.hadoop.fs.{FileSystem, Path}
29+
import org.apache.hadoop.hive.common.FileUtils
30+
import org.apache.hadoop.hive.ql.exec.TaskRunner
31+
32+
import org.apache.spark.internal.Logging
33+
import org.apache.spark.sql.SparkSession
34+
import org.apache.spark.sql.errors.QueryExecutionErrors
35+
import org.apache.spark.sql.hive.HiveExternalCatalog
36+
import org.apache.spark.sql.hive.client.HiveVersion
37+
38+
class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: Path)
39+
extends Logging {
40+
private var stagingDirForCreating: Option[Path] = None
41+
42+
lazy val externalTempPath: Path = getExternalTmpPath(path)
43+
44+
private def getExternalTmpPath(path: Path): Path = {
45+
import org.apache.spark.sql.hive.client.hive._
46+
47+
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
48+
// a common scratch directory. After the writing is finished, Hive will simply empty the table
49+
// directory and move the staging directory to it.
50+
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
51+
// moving staging directory to table directory, Hive will still empty the table directory, but
52+
// will exclude the staging directory there.
53+
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
54+
// staging directory under the table director for Hive prior to 1.1, the staging directory will
55+
// be removed by Hive when Hive is trying to empty the table directory.
56+
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
57+
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
58+
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
59+
60+
// Ensure all the supported versions are considered here.
61+
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
62+
allSupportedHiveVersions)
63+
64+
val externalCatalog = session.sharedState.externalCatalog
65+
val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
66+
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
67+
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
68+
69+
if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
70+
oldVersionExternalTempPath(path, scratchDir)
71+
} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
72+
newVersionExternalTempPath(path, stagingDir)
73+
} else {
74+
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
75+
}
76+
}
77+
78+
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
79+
private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path = {
80+
val extURI: URI = path.toUri
81+
val scratchPath = new Path(scratchDir, executionId)
82+
var dirPath = new Path(
83+
extURI.getScheme,
84+
extURI.getAuthority,
85+
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
86+
87+
val fs = dirPath.getFileSystem(hadoopConf)
88+
dirPath = new Path(fs.makeQualified(dirPath).toString())
89+
stagingDirForCreating = Some(dirPath)
90+
dirPath
91+
}
92+
93+
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
94+
private def newVersionExternalTempPath(path: Path, stagingDir: String): Path = {
95+
val extURI: URI = path.toUri
96+
if (extURI.getScheme == "viewfs") {
97+
val qualifiedStagingDir = getStagingDir(path, stagingDir)
98+
stagingDirForCreating = Some(qualifiedStagingDir)
99+
// Hive uses 10000
100+
new Path(qualifiedStagingDir, "-ext-10000")
101+
} else {
102+
val qualifiedStagingDir = getExternalScratchDir(extURI, stagingDir)
103+
stagingDirForCreating = Some(qualifiedStagingDir)
104+
new Path(qualifiedStagingDir, "-ext-10000")
105+
}
106+
}
107+
108+
private def getExternalScratchDir(extURI: URI, stagingDir: String): Path = {
109+
getStagingDir(
110+
new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
111+
stagingDir)
112+
}
113+
114+
private[hive] def getStagingDir(inputPath: Path, stagingDir: String): Path = {
115+
val inputPathName: String = inputPath.toString
116+
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
117+
var stagingPathName: String =
118+
if (inputPathName.indexOf(stagingDir) == -1) {
119+
new Path(inputPathName, stagingDir).toString
120+
} else {
121+
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
122+
}
123+
124+
// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
125+
// staging directory needs to avoid being deleted when users set hive.exec.stagingdir
126+
// under the table directory.
127+
if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
128+
!stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
129+
logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
130+
"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
131+
"directory.")
132+
stagingPathName = new Path(inputPathName, ".hive-staging").toString
133+
}
134+
135+
val dir: Path =
136+
fs.makeQualified(
137+
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
138+
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
139+
dir
140+
}
141+
142+
// HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's FileUtils.isSubDir().
143+
private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = {
144+
val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR
145+
val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR
146+
path1.startsWith(path2)
147+
}
148+
149+
private def executionId: String = {
150+
val rand: Random = new Random
151+
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
152+
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
153+
}
154+
155+
def deleteTmpPath() : Unit = {
156+
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
157+
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
158+
try {
159+
stagingDirForCreating.foreach { stagingDir =>
160+
val fs = stagingDir.getFileSystem(hadoopConf)
161+
if (fs.delete(stagingDir, true)) {
162+
// If we successfully delete the staging directory, remove it from FileSystem's cache.
163+
fs.cancelDeleteOnExit(stagingDir)
164+
}
165+
}
166+
} catch {
167+
case NonFatal(e) =>
168+
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
169+
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
170+
}
171+
}
172+
173+
def createTmpPath(): Unit = {
174+
try {
175+
stagingDirForCreating.foreach { stagingDir =>
176+
val fs: FileSystem = stagingDir.getFileSystem(hadoopConf)
177+
if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) {
178+
throw new IllegalStateException(
179+
"Cannot create staging directory '" + stagingDir.toString + "'")
180+
}
181+
fs.deleteOnExit(stagingDir)
182+
}
183+
} catch {
184+
case e: IOException =>
185+
throw QueryExecutionErrors.cannotCreateStagingDirError(
186+
s"'${stagingDirForCreating.toString}': ${e.getMessage}", e)
187+
}
188+
}
189+
190+
def deleteIfNotStagingDir(path: Path, fs: FileSystem): Unit = {
191+
if (Option(path) != stagingDirForCreating) fs.delete(path, true)
192+
}
193+
}

0 commit comments

Comments
 (0)