Skip to content

Commit

Permalink
Revert "[SPARK-5423][Core] Cleanup resources in DiskMapIterator.final…
Browse files Browse the repository at this point in the history
…ize to ensure deleting the temp file"

This reverts commit 90095bf.
  • Loading branch information
Andrew Or committed Mar 3, 2015
1 parent e359794 commit 9af0017
Showing 1 changed file with 9 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,6 @@ class ExternalAppendOnlyMap[K, V, C](
private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null

@volatile private var closed = false

// A volatile variable to remember which DeserializationStream is using. Need to set it when we
// open a DeserializationStream. But we should use `deserializeStream` rather than
// `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
// reduce the performance. It must be volatile so that we can see its correct value in the
// `finalize` method, which could run in any thread.
@volatile private var deserializeStreamToBeClosed: DeserializationStream = null

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
Expand All @@ -410,7 +401,6 @@ class ExternalAppendOnlyMap[K, V, C](
// we're still in a valid batch.
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStreamToBeClosed = null
deserializeStream.close()
fileStream.close()
deserializeStream = null
Expand All @@ -429,11 +419,7 @@ class ExternalAppendOnlyMap[K, V, C](

val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
// Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
// close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
// during reading the (K, C) pairs.
deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
deserializeStreamToBeClosed
ser.deserializeStream(compressedStream)
} else {
// No more batches left
cleanup()
Expand Down Expand Up @@ -482,34 +468,14 @@ class ExternalAppendOnlyMap[K, V, C](
item
}

// TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
// future, we need some mechanism to ensure this gets called once the resources are not used.
private def cleanup(): Unit = {
if (!closed) {
closed = true
batchIndex = batchOffsets.length // Prevent reading any other batch
fileStream = null
try {
val ds = deserializeStreamToBeClosed
deserializeStreamToBeClosed = null
deserializeStream = null
if (ds != null) {
ds.close()
}
} finally {
if (file.exists()) {
file.delete()
}
}
}
}

override def finalize(): Unit = {
try {
cleanup()
} finally {
super.finalize()
}
// TODO: Ensure this gets called even if the iterator isn't drained.
private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileStream = null
ds.close()
file.delete()
}
}

Expand Down

0 comments on commit 9af0017

Please sign in to comment.