Skip to content

Commit f68d761

Browse files
AngersZhuuuucloud-fan
authored andcommitted
[SPARK-48292][CORE] Revert [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status
### What changes were proposed in this pull request? Revert #36564 According to discuss #36564 (comment) When spark commit task will commit to committedTaskPath `${outputpath}/_temporary//${appAttempId}/${taskId}` So in #36564 's case, since before #38980, each task's job id's date is not the same, when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated. After #38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated. Note: The taskAttemptPath is not same since in the path contains the taskAttemptId. ### Why are the changes needed? No need anymore ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existed UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #46696 from AngersZhuuuu/SPARK-48292. Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 69afd4b commit f68d761

File tree

6 files changed

+58
-88
lines changed

6 files changed

+58
-88
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -287,12 +287,7 @@ class SparkContext(config: SparkConf) extends Logging {
287287
conf: SparkConf,
288288
isLocal: Boolean,
289289
listenerBus: LiveListenerBus): SparkEnv = {
290-
SparkEnv.createDriverEnv(
291-
conf,
292-
isLocal,
293-
listenerBus,
294-
SparkContext.numDriverCores(master, conf),
295-
this)
290+
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
296291
}
297292

298293
private[spark] def env: SparkEnv = _env

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,6 @@ object SparkEnv extends Logging {
258258
isLocal: Boolean,
259259
listenerBus: LiveListenerBus,
260260
numCores: Int,
261-
sparkContext: SparkContext,
262261
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
263262
assert(conf.contains(DRIVER_HOST_ADDRESS),
264263
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
@@ -281,7 +280,6 @@ object SparkEnv extends Logging {
281280
numCores,
282281
ioEncryptionKey,
283282
listenerBus = listenerBus,
284-
Option(sparkContext),
285283
mockOutputCommitCoordinator = mockOutputCommitCoordinator
286284
)
287285
}
@@ -317,7 +315,6 @@ object SparkEnv extends Logging {
317315
/**
318316
* Helper method to create a SparkEnv for a driver or an executor.
319317
*/
320-
// scalastyle:off argcount
321318
private def create(
322319
conf: SparkConf,
323320
executorId: String,
@@ -328,9 +325,7 @@ object SparkEnv extends Logging {
328325
numUsableCores: Int,
329326
ioEncryptionKey: Option[Array[Byte]],
330327
listenerBus: LiveListenerBus = null,
331-
sc: Option[SparkContext] = None,
332328
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
333-
// scalastyle:on argcount
334329

335330
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
336331

@@ -473,12 +468,7 @@ object SparkEnv extends Logging {
473468
}
474469

475470
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
476-
if (isDriver) {
477-
new OutputCommitCoordinator(conf, isDriver, sc)
478-
} else {
479-
new OutputCommitCoordinator(conf, isDriver)
480-
}
481-
471+
new OutputCommitCoordinator(conf, isDriver)
482472
}
483473
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
484474
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,7 @@ private case class AskPermissionToCommitOutput(
4444
* This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
4545
* for an extensive design discussion.
4646
*/
47-
private[spark] class OutputCommitCoordinator(
48-
conf: SparkConf,
49-
isDriver: Boolean,
50-
sc: Option[SparkContext] = None) extends Logging {
47+
private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {
5148

5249
// Initialized by SparkEnv
5350
var coordinatorRef: Option[RpcEndpointRef] = None
@@ -160,10 +157,9 @@ private[spark] class OutputCommitCoordinator(
160157
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
161158
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
162159
if (stageState.authorizedCommitters(partition) == taskId) {
163-
sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " +
164-
s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " +
165-
s"but task commit success, data duplication may happen. " +
166-
s"reason=$reason"))
160+
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
161+
s"partition=$partition) failed; clearing lock")
162+
stageState.authorizedCommitters(partition) = null
167163
}
168164
}
169165
}

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler
1919

2020
import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
2121
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
22+
import org.scalatest.time.{Seconds, Span}
2223

23-
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, TaskContext}
24+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext}
2425

2526
/**
2627
* Integration tests for the OutputCommitCoordinator.
@@ -44,15 +45,13 @@ class OutputCommitCoordinatorIntegrationSuite
4445
sc = new SparkContext("local[2, 4]", "test", conf)
4546
}
4647

47-
test("SPARK-39195: exception thrown in OutputCommitter.commitTask()") {
48+
test("exception thrown in OutputCommitter.commitTask()") {
4849
// Regression test for SPARK-10381
49-
val e = intercept[SparkException] {
50+
failAfter(Span(60, Seconds)) {
5051
withTempDir { tempDir =>
5152
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
5253
}
53-
}.getCause.getMessage
54-
assert(e.contains("failed; but task commit success, data duplication may happen.") &&
55-
e.contains("Intentional exception"))
54+
}
5655
}
5756
}
5857

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,11 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
8787
isLocal: Boolean,
8888
listenerBus: LiveListenerBus): SparkEnv = {
8989
outputCommitCoordinator =
90-
spy[OutputCommitCoordinator](
91-
new OutputCommitCoordinator(conf, isDriver = true, Option(this)))
90+
spy[OutputCommitCoordinator](new OutputCommitCoordinator(conf, isDriver = true))
9291
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
9392
// This mocking allows us to control the coordinator responses in test cases.
9493
SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
95-
SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator))
94+
SparkContext.numDriverCores(master), Some(outputCommitCoordinator))
9695
}
9796
}
9897
// Use Mockito.spy() to maintain the default infrastructure everywhere else
@@ -190,9 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
190189
// The authorized committer now fails, clearing the lock
191190
outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
192191
attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
193-
// A new task should not be allowed to become stage failed because of potential data duplication
194-
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
192+
// A new task should now be allowed to become the authorized committer
193+
assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
195194
nonAuthorizedCommitter + 2))
195+
// There can only be one authorized committer
196+
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
197+
nonAuthorizedCommitter + 3))
196198
}
197199

198200
test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
@@ -226,8 +228,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
226228
assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))
227229

228230
// Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
229-
// then fail the 1st attempt and since stage failed because of potential data duplication,
230-
// make sure fail the 4th attempt.
231+
// then fail the 1st attempt and make sure the 4th one can commit again.
231232
stage += 1
232233
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
233234
assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
@@ -236,9 +237,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
236237
assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
237238
outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
238239
ExecutorLostFailure("0", exitCausedByApp = true, None))
239-
// A new task should not be allowed to become the authorized committer since stage failed
240-
// because of potential data duplication
241-
assert(!outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
240+
assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
242241
}
243242

244243
test("SPARK-24589: Make sure stage state is cleaned up") {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 38 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter
3737
import org.apache.parquet.io.api.Binary
3838
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
3939

40-
import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, TestUtils}
40+
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
4141
import org.apache.spark.sql._
4242
import org.apache.spark.sql.catalyst.InternalRow
4343
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
@@ -1206,6 +1206,43 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
12061206
errorMessage.contains("is not a valid DFS filename"))
12071207
}
12081208

1209+
test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
1210+
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
1211+
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
1212+
// Using a output committer that always fail when committing a task, so that both
1213+
// `commitTask()` and `abortTask()` are invoked.
1214+
val extraOptions = Map[String, String](
1215+
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
1216+
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
1217+
)
1218+
1219+
// Before fixing SPARK-7837, the following code results in an NPE because both
1220+
// `commitTask()` and `abortTask()` try to close output writers.
1221+
1222+
withTempPath { dir =>
1223+
val m1 = intercept[SparkException] {
1224+
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
1225+
}
1226+
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
1227+
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
1228+
}
1229+
1230+
withTempPath { dir =>
1231+
val m2 = intercept[SparkException] {
1232+
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
1233+
.coalesce(1)
1234+
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
1235+
}
1236+
if (m2.getErrorClass != null) {
1237+
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
1238+
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
1239+
} else {
1240+
assert(m2.getMessage.contains("TASK_WRITE_FAILED"))
1241+
}
1242+
}
1243+
}
1244+
}
1245+
12091246
test("SPARK-11044 Parquet writer version fixed as version1 ") {
12101247
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
12111248
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
@@ -1550,52 +1587,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
15501587
}
15511588
}
15521589

1553-
// Parquet IO test suite with output commit coordination disabled.
1554-
// This test suite is separated ParquetIOSuite to avoid race condition of failure events
1555-
// from `OutputCommitCoordination` and `TaskSetManager`.
1556-
class ParquetIOWithoutOutputCommitCoordinationSuite
1557-
extends QueryTest with ParquetTest with SharedSparkSession {
1558-
import testImplicits._
1559-
1560-
override protected def sparkConf: SparkConf = {
1561-
super.sparkConf
1562-
.set("spark.hadoop.outputCommitCoordination.enabled", "false")
1563-
}
1564-
1565-
test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
1566-
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
1567-
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
1568-
// Using a output committer that always fail when committing a task, so that both
1569-
// `commitTask()` and `abortTask()` are invoked.
1570-
val extraOptions = Map[String, String](
1571-
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
1572-
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
1573-
)
1574-
1575-
// Before fixing SPARK-7837, the following code results in an NPE because both
1576-
// `commitTask()` and `abortTask()` try to close output writers.
1577-
1578-
withTempPath { dir =>
1579-
val m1 = intercept[SparkException] {
1580-
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
1581-
}
1582-
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
1583-
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
1584-
}
1585-
1586-
withTempPath { dir =>
1587-
val m2 = intercept[SparkException] {
1588-
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
1589-
.coalesce(1)
1590-
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
1591-
}
1592-
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
1593-
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
1594-
}
1595-
}
1596-
}
1597-
}
1598-
15991590
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
16001591
extends ParquetOutputCommitter(outputPath, context) {
16011592

0 commit comments

Comments
 (0)