@@ -29,10 +29,8 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
29
29
import org .apache .spark .sql .catalyst .expressions .Attribute
30
30
import org .apache .spark .sql .catalyst .plans .logical .LogicalPlan
31
31
import org .apache .spark .sql .execution .SparkPlan
32
- import org .apache .spark .sql .execution .streaming .{MicroBatchExecution , StreamExecution }
33
- import org .apache .spark .sql .execution .streaming .continuous .{CommitPartitionEpoch , ContinuousExecution , EpochCoordinatorRef , SetWriterPartitions }
32
+ import org .apache .spark .sql .execution .streaming .MicroBatchExecution
34
33
import org .apache .spark .sql .sources .v2 .writer ._
35
- import org .apache .spark .sql .sources .v2 .writer .streaming .StreamWriter
36
34
import org .apache .spark .sql .types .StructType
37
35
import org .apache .spark .util .Utils
38
36
@@ -110,7 +108,7 @@ object DataWritingSparkTask extends Logging {
110
108
useCommitCoordinator : Boolean ): WriterCommitMessage = {
111
109
val stageId = context.stageId()
112
110
val partId = context.partitionId()
113
- val attemptId = context.attemptNumber()
111
+ val attemptId = context.taskAttemptId().toInt
114
112
val epochId = Option (context.getLocalProperty(MicroBatchExecution .BATCH_ID_KEY )).getOrElse(" 0" )
115
113
val dataWriter = writeTask.createDataWriter(partId, attemptId, epochId.toLong)
116
114
@@ -124,10 +122,12 @@ object DataWritingSparkTask extends Logging {
124
122
val coordinator = SparkEnv .get.outputCommitCoordinator
125
123
val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId)
126
124
if (commitAuthorized) {
127
- logInfo(s " Writer for stage $stageId, task $partId. $attemptId is authorized to commit. " )
125
+ logInfo(
126
+ s " Writer for stage $stageId, task $partId (TID $attemptId) is authorized to commit. " )
128
127
dataWriter.commit()
129
128
} else {
130
- val message = s " Stage $stageId, task $partId. $attemptId: driver did not authorize commit "
129
+ val message =
130
+ s " Stage $stageId, task $partId (TID $attemptId): driver did not authorize commit "
131
131
logInfo(message)
132
132
// throwing CommitDeniedException will trigger the catch block for abort
133
133
throw new CommitDeniedException (message, stageId, partId, attemptId)
@@ -138,15 +138,15 @@ object DataWritingSparkTask extends Logging {
138
138
dataWriter.commit()
139
139
}
140
140
141
- logInfo(s " Writer for stage $stageId, task $partId. $attemptId committed. " )
141
+ logInfo(s " Writer for stage $stageId, task $partId (TID $attemptId) committed." )
142
142
143
143
msg
144
144
145
145
})(catchBlock = {
146
146
// If there is an error, abort this writer
147
- logError(s " Writer for stage $stageId, task $partId. $attemptId is aborting. " )
147
+ logError(s " Writer for stage $stageId, task $partId (TID $attemptId) is aborting." )
148
148
dataWriter.abort()
149
- logError(s " Writer for stage $stageId, task $partId. $attemptId aborted. " )
149
+ logError(s " Writer for stage $stageId, task $partId (TID $attemptId) aborted." )
150
150
})
151
151
}
152
152
}
0 commit comments