Skip to content

Commit d46b15d

Browse files
yikfcloud-fan
authored andcommitted
[SPARK-42478] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory
### What changes were proposed in this pull request? Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory ### Why are the changes needed? [SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter, but it breaks a serializable issue, JobId is non-serializable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA Closes apache#40064 from Yikf/write-job-id. Authored-by: Yikf <yikaifei@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent bd6b751 commit d46b15d

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ case class FileWriterFactory (
3030
description: WriteJobDescription,
3131
committer: FileCommitProtocol) extends DataWriterFactory {
3232

33-
private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
33+
// SPARK-42478: jobId across tasks should be consistent to meet the contract
34+
// expected by Hadoop committers, but `JobId` cannot be serialized.
35+
// thus, persist the serializable jobTrackerID in the class and make jobId a
36+
// transient lazy val which recreates it each time to ensure jobId is unique.
37+
private[this] val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date)
38+
@transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0)
3439

3540
override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
3641
val taskAttemptContext = createTaskAttemptContext(partitionId)

0 commit comments

Comments
 (0)