Skip to content

Commit 0ebdf97

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-7479][CARMEL-6358] Change to file commit algorithm V2 for CTAS (apache#238)
1 parent 1c2d02d commit 0ebdf97

File tree

3 files changed

+38
-0
lines changed

3 files changed

+38
-0
lines changed

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,9 @@ class HadoopMapReduceCommitProtocol(
197197
}
198198

199199
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
200+
logInfo("Start to commit job ......")
200201
committer.commitJob(jobContext)
202+
logInfo("Commit job finished!")
201203

202204
if (hasValidPath) {
203205
val (allAbsPathFiles, allPartitionPaths, _) =

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command
1919

2020
import java.net.URI
2121

22+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
23+
2224
import org.apache.spark.sql._
2325
import org.apache.spark.sql.catalyst.catalog._
2426
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
@@ -167,6 +169,8 @@ case class CreateDataSourceTableAsSelectCommand(
167169
DDLUtils.verifyOperationNotSupported(table, "Create partitioned table")
168170
}
169171
DDLUtils.checkPrivilegeOfSpecifyTableLocation(table, sessionState)
172+
val originalFileOutputCommitterAlgorithm = sessionState.conf.getConfString(
173+
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "1")
170174

171175
if (sessionState.catalog.tableExists(table)) {
172176
assert(mode != SaveMode.Overwrite,
@@ -193,6 +197,11 @@ case class CreateDataSourceTableAsSelectCommand(
193197
} else {
194198
table.storage.locationUri
195199
}
200+
if (!originalFileOutputCommitterAlgorithm.equals("2")) {
201+
sessionState.conf.setConfString(
202+
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "2")
203+
logInfo("Set file output committer algorithm as version 2 when CTAS")
204+
}
196205
val result = saveDataIntoTable(
197206
sparkSession, table, tableLocation, SaveMode.Overwrite, tableExists = false)
198207
val tableSchema = CharVarcharUtils.getRawSchema(
@@ -216,6 +225,13 @@ case class CreateDataSourceTableAsSelectCommand(
216225
enableDropPartitions = false), CommandExecutionMode.SKIP).toRdd
217226
case _ =>
218227
}
228+
if (!originalFileOutputCommitterAlgorithm.equals("2")) {
229+
sessionState.conf.setConfString(
230+
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
231+
originalFileOutputCommitterAlgorithm)
232+
logInfo(s"Set file output committer algorithm " +
233+
s"back to version $originalFileOutputCommitterAlgorithm")
234+
}
219235
}
220236

221237
CommandUtils.updateTableStats(sparkSession, table)

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution
1919

2020
import scala.util.control.NonFatal
2121

22+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
23+
2224
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
2325
import org.apache.spark.sql.catalyst.catalog.CatalogTable
2426
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
@@ -91,7 +93,15 @@ case class CreateHiveTableAsSelectCommand(
9193
sparkSession.sessionState.catalog.validateTableLocation(tableDesc)
9294
catalog.createTable(tableDesc.copy(schema = tableSchema), ignoreIfExists = false)
9395

96+
val originalFileOutputCommitterAlgorithm = sparkSession.sessionState.conf.getConfString(
97+
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "1")
98+
9499
try {
100+
if (!originalFileOutputCommitterAlgorithm.equals("2")) {
101+
sparkSession.sessionState.conf.setConfString(
102+
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "2")
103+
logInfo("Set file output committer algorithm as version 2 when CTAS for Hive table")
104+
}
95105
// Read back the metadata of the table which was created just now.
96106
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
97107
val command = getWritingCommand(createdTableMeta, tableExists = false)
@@ -108,6 +118,16 @@ case class CreateHiveTableAsSelectCommand(
108118
// drop the created table.
109119
catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, purge = false)
110120
throw e
121+
} finally {
122+
val currentFileOutputCommitterAlgorithm = sparkSession.sessionState.conf.getConfString(
123+
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, "1")
124+
if (!currentFileOutputCommitterAlgorithm.equals(originalFileOutputCommitterAlgorithm)) {
125+
sparkSession.sessionState.conf.setConfString(
126+
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
127+
originalFileOutputCommitterAlgorithm)
128+
logInfo(s"Set file output committer algorithm " +
129+
s"back to version $originalFileOutputCommitterAlgorithm")
130+
}
111131
}
112132
}
113133

0 commit comments

Comments
 (0)