@@ -154,7 +154,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
154
154
test(" Job should not complete if all commits are denied" ) {
155
155
// Create a mock OutputCommitCoordinator that denies all attempts to commit
156
156
doReturn(false ).when(outputCommitCoordinator).handleAskPermissionToCommit(
157
- Matchers .any(), Matchers .any(), Matchers .any(), Matchers .any() )
157
+ Matchers .any(), Matchers .any(), Matchers .any())
158
158
val rdd : RDD [Int ] = sc.parallelize(Seq (1 ), 1 )
159
159
def resultHandler (x : Int , y : Unit ): Unit = {}
160
160
val futureAction : SimpleFutureAction [Unit ] = sc.submitJob[Int , Unit , Unit ](rdd,
@@ -170,73 +170,65 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
170
170
171
171
test(" Only authorized committer failures can clear the authorized committer lock (SPARK-6614)" ) {
172
172
val stage : Int = 1
173
- val stageAttempt : Int = 1
174
173
val partition : Int = 2
175
- val authorizedCommitter : Int = 3
176
- val nonAuthorizedCommitter : Int = 100
174
+ val authorizedCommitter : Long = 3
175
+ val nonAuthorizedCommitter : Long = 100
177
176
outputCommitCoordinator.stageStart(stage, maxPartitionId = 2 )
178
177
179
- assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, authorizedCommitter))
180
- assert(! outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
181
- nonAuthorizedCommitter))
178
+ assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
179
+ assert(! outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
182
180
// The non-authorized committer fails
183
- outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition ,
184
- attemptNumber = nonAuthorizedCommitter, reason = TaskKilled (" test" ))
181
+ outputCommitCoordinator.taskCompleted(stage, partition, nonAuthorizedCommitter ,
182
+ reason = TaskKilled (" test" ))
185
183
// New tasks should still not be able to commit because the authorized committer has not failed
186
- assert(! outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
187
- nonAuthorizedCommitter + 1 ))
184
+ assert(! outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1 ))
188
185
// The authorized committer now fails, clearing the lock
189
- outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition ,
190
- attemptNumber = authorizedCommitter, reason = TaskKilled (" test" ))
186
+ outputCommitCoordinator.taskCompleted(stage, partition, authorizedCommitter ,
187
+ reason = TaskKilled (" test" ))
191
188
// A new task should now be allowed to become the authorized committer
192
- assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
193
- nonAuthorizedCommitter + 2 ))
189
+ assert(outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2 ))
194
190
// There can only be one authorized committer
195
- assert(! outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
196
- nonAuthorizedCommitter + 3 ))
191
+ assert(! outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3 ))
197
192
}
198
193
199
194
test(" SPARK-19631: Do not allow failed attempts to be authorized for committing" ) {
200
195
val stage : Int = 1
201
- val stageAttempt : Int = 1
202
196
val partition : Int = 1
203
- val failedAttempt : Int = 0
197
+ val failedAttempt : Long = 0L
204
198
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1 )
205
- outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
206
- attemptNumber = failedAttempt,
199
+ outputCommitCoordinator.taskCompleted(stage, partition, failedAttempt,
207
200
reason = ExecutorLostFailure (" 0" , exitCausedByApp = true , None ))
208
- assert(! outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt))
209
- assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt + 1 ))
201
+ assert(! outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
202
+ assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1 ))
210
203
}
211
204
212
205
test(" SPARK-24589: Differentiate tasks from different stage attempts" ) {
213
206
var stage = 1
214
- val taskAttempt = 1
215
207
val partition = 1
216
208
217
209
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1 )
218
- assert(outputCommitCoordinator.canCommit(stage, 1 , partition, taskAttempt ))
219
- assert(! outputCommitCoordinator.canCommit(stage, 2 , partition, taskAttempt ))
210
+ assert(outputCommitCoordinator.canCommit(stage, partition, 1L ))
211
+ assert(! outputCommitCoordinator.canCommit(stage, partition, 2L ))
220
212
221
213
// Fail the task in the first attempt, the task in the second attempt should succeed.
222
214
stage += 1
223
215
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1 )
224
- outputCommitCoordinator.taskCompleted(stage, 1 , partition, taskAttempt ,
216
+ outputCommitCoordinator.taskCompleted(stage, partition, 1L ,
225
217
ExecutorLostFailure (" 0" , exitCausedByApp = true , None ))
226
- assert(! outputCommitCoordinator.canCommit(stage, 1 , partition, taskAttempt ))
227
- assert(outputCommitCoordinator.canCommit(stage, 2 , partition, taskAttempt ))
218
+ assert(! outputCommitCoordinator.canCommit(stage, partition, 1L ))
219
+ assert(outputCommitCoordinator.canCommit(stage, partition, 2L ))
228
220
229
221
// Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
230
222
// then fail the 1st attempt and make sure the 4th one can commit again.
231
223
stage += 1
232
224
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1 )
233
- assert(outputCommitCoordinator.canCommit(stage, 1 , partition, taskAttempt ))
234
- outputCommitCoordinator.taskCompleted(stage, 2 , partition, taskAttempt ,
225
+ assert(outputCommitCoordinator.canCommit(stage, partition, 1L ))
226
+ outputCommitCoordinator.taskCompleted(stage, partition, 2 ,
235
227
ExecutorLostFailure (" 0" , exitCausedByApp = true , None ))
236
- assert(! outputCommitCoordinator.canCommit(stage, 3 , partition, taskAttempt ))
237
- outputCommitCoordinator.taskCompleted(stage, 1 , partition, taskAttempt ,
228
+ assert(! outputCommitCoordinator.canCommit(stage, partition, 3L ))
229
+ outputCommitCoordinator.taskCompleted(stage, partition, 1L ,
238
230
ExecutorLostFailure (" 0" , exitCausedByApp = true , None ))
239
- assert(outputCommitCoordinator.canCommit(stage, 4 , partition, taskAttempt ))
231
+ assert(outputCommitCoordinator.canCommit(stage, partition, 4L ))
240
232
}
241
233
242
234
test(" SPARK-24589: Make sure stage state is cleaned up" ) {
0 commit comments