@@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
387
387
private var batchIndex = 0 // Which batch we're in
388
388
private var fileStream : FileInputStream = null
389
389
390
+ @ volatile private var closed = false
391
+
392
+ // A volatile variable to remember which DeserializationStream is using. Need to set it when we
393
+ // open a DeserializationStream. But we should use `deserializeStream` rather than
394
+ // `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
395
+ // reduce the performance. It must be volatile so that we can see its correct value in the
396
+ // `finalize` method, which could run in any thread.
397
+ @ volatile private var deserializeStreamToBeClosed : DeserializationStream = null
398
+
390
399
// An intermediate stream that reads from exactly one batch
391
400
// This guards against pre-fetching and other arbitrary behavior of higher level streams
392
401
private var deserializeStream = nextBatchStream()
@@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
401
410
// we're still in a valid batch.
402
411
if (batchIndex < batchOffsets.length - 1 ) {
403
412
if (deserializeStream != null ) {
413
+ deserializeStreamToBeClosed = null
404
414
deserializeStream.close()
405
415
fileStream.close()
406
416
deserializeStream = null
@@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](
419
429
420
430
val bufferedStream = new BufferedInputStream (ByteStreams .limit(fileStream, end - start))
421
431
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
422
- ser.deserializeStream(compressedStream)
432
+ // Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
433
+ // close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
434
+ // during reading the (K, C) pairs.
435
+ deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
436
+ deserializeStreamToBeClosed
423
437
} else {
424
438
// No more batches left
425
439
cleanup()
@@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
468
482
item
469
483
}
470
484
471
- // TODO: Ensure this gets called even if the iterator isn't drained.
472
- private def cleanup () {
473
- batchIndex = batchOffsets.length // Prevent reading any other batch
474
- val ds = deserializeStream
475
- deserializeStream = null
476
- fileStream = null
477
- ds.close()
478
- file.delete()
485
+ // TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
486
+ // future, we need some mechanism to ensure this gets called once the resources are not used.
487
+ private def cleanup (): Unit = {
488
+ if (! closed) {
489
+ closed = true
490
+ batchIndex = batchOffsets.length // Prevent reading any other batch
491
+ fileStream = null
492
+ try {
493
+ val ds = deserializeStreamToBeClosed
494
+ deserializeStreamToBeClosed = null
495
+ deserializeStream = null
496
+ if (ds != null ) {
497
+ ds.close()
498
+ }
499
+ } finally {
500
+ if (file.exists()) {
501
+ file.delete()
502
+ }
503
+ }
504
+ }
505
+ }
506
+
507
+ override def finalize (): Unit = {
508
+ try {
509
+ cleanup()
510
+ } finally {
511
+ super .finalize()
512
+ }
479
513
}
480
514
}
481
515
0 commit comments