@@ -24,11 +24,14 @@ abstract class BatchProcessor<Input, Output>(
2424 open suspend fun run (items : List <Input >): Metrics = withContext(Dispatchers .IO ) {
2525 val processedCount = AtomicInteger (0 )
2626 val startTime = System .currentTimeMillis()
27+ var totalSuccess = 0
2728
2829 try {
2930 if (items.isEmpty()) {
3031 Log .w(TAG , " No items to process." )
31- return @withContext Metrics .Success ()
32+ val metrics = Metrics .Success ()
33+ listener?.onComplete(application, metrics)
34+ return @withContext metrics
3235 }
3336
3437 val memoryUtils = MemoryUtils (application, options.memory)
@@ -44,24 +47,26 @@ abstract class BatchProcessor<Input, Output>(
4447 semaphore.withPermit {
4548 try {
4649 val output = onProcess(application, item)
47- val current = processedCount.incrementAndGet()
48- val progress = current.toFloat() / items.size
49- listener?.onProgress(application, progress)
5050 output
5151 } catch (e: Exception ) {
5252 listener?.onError(application, e, item)
5353 null
54+ }finally {
55+ val current = processedCount.incrementAndGet()
56+ val progress = current.toFloat() / items.size
57+ listener?.onProgress(application, progress)
5458 }
5559 }
5660 }
5761 }
5862
5963 val outputBatch = deferredResults.mapNotNull { it.await() }
64+ totalSuccess + = outputBatch.size
6065 onBatchComplete(application, outputBatch)
6166 }
6267
6368 val endTime = System .currentTimeMillis()
64- val metrics = Metrics .Success (processedCount.get() , timeElapsed = endTime - startTime)
69+ val metrics = Metrics .Success (totalSuccess , timeElapsed = endTime - startTime)
6570
6671 listener?.onComplete(application, metrics)
6772 metrics
@@ -71,7 +76,7 @@ abstract class BatchProcessor<Input, Output>(
7176 }
7277 catch (e: Exception ) {
7378 val metrics = Metrics .Failure (
74- processedBeforeFailure = processedCount.get() ,
79+ processedBeforeFailure = totalSuccess ,
7580 timeElapsed = System .currentTimeMillis() - startTime,
7681 error = e
7782 )
0 commit comments