@@ -109,6 +109,7 @@ object DataWritingSparkTask extends Logging {
109
109
iter : Iterator [InternalRow ],
110
110
useCommitCoordinator : Boolean ): WriterCommitMessage = {
111
111
val stageId = context.stageId()
112
+ val stageAttempt = context.stageAttemptNumber()
112
113
val partId = context.partitionId()
113
114
val attemptId = context.attemptNumber()
114
115
val epochId = Option (context.getLocalProperty(MicroBatchExecution .BATCH_ID_KEY )).getOrElse(" 0" )
@@ -122,12 +123,14 @@ object DataWritingSparkTask extends Logging {
122
123
123
124
val msg = if (useCommitCoordinator) {
124
125
val coordinator = SparkEnv .get.outputCommitCoordinator
125
- val commitAuthorized = coordinator.canCommit(context. stageId() , partId, attemptId)
126
+ val commitAuthorized = coordinator.canCommit(stageId, stageAttempt , partId, attemptId)
126
127
if (commitAuthorized) {
127
- logInfo(s " Writer for stage $stageId, task $partId. $attemptId is authorized to commit. " )
128
+ logInfo(s " Writer for stage $stageId / $stageAttempt, " +
129
+ s " task $partId. $attemptId is authorized to commit. " )
128
130
dataWriter.commit()
129
131
} else {
130
- val message = s " Stage $stageId, task $partId. $attemptId: driver did not authorize commit "
132
+ val message = s " Stage $stageId / $stageAttempt, " +
133
+ s " task $partId. $attemptId: driver did not authorize commit "
131
134
logInfo(message)
132
135
// throwing CommitDeniedException will trigger the catch block for abort
133
136
throw new CommitDeniedException (message, stageId, partId, attemptId)
0 commit comments