Skip to content

Commit

Permalink
[SPARK-15664][MLLIB] Replace FileSystem.get(conf) with path.getFileSy…
Browse files Browse the repository at this point in the history
…stem(conf) when removing CheckpointFile in MLlib

## What changes were proposed in this pull request?
if sparkContext.set CheckpointDir to another Dir that is not default FileSystem, it will throw exception when removing CheckpointFile in MLlib.
So we should always get the FileSystem from Path to avoid wrong FS problem.
## How was this patch tested?
N/A

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #13408 from lianhuiwang/SPARK-15664.
  • Loading branch information
lianhuiwang authored and srowen committed Jun 1, 2016
1 parent e4ce1bc commit 6563d72
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 20 deletions.
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.clustering

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -696,8 +696,8 @@ class DistributedLDAModel private[ml] (
@DeveloperApi
@Since("2.0.0")
def deleteCheckpointFiles(): Unit = {
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
_checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
_checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, hadoopConf))
_checkpointFiles = Array.empty[String]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.IOException

import scala.collection.mutable

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.ml.tree.{LearningNode, Split}
Expand Down Expand Up @@ -77,8 +77,8 @@ private[spark] class NodeIdCache(
// Indicates whether we can checkpoint
private val canCheckpoint = nodeIdsForInstances.sparkContext.getCheckpointDir.nonEmpty

// FileSystem instance for deleting checkpoints as needed
private val fs = FileSystem.get(nodeIdsForInstances.sparkContext.hadoopConfiguration)
// Hadoop Configuration for deleting checkpoints as needed
private val hadoopConf = nodeIdsForInstances.sparkContext.hadoopConfiguration

/**
* Update the node index values in the cache.
Expand Down Expand Up @@ -130,7 +130,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we'll manually delete it here.
try {
fs.delete(new Path(old.getCheckpointFile.get), true)
val path = new Path(old.getCheckpointFile.get)
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
Expand All @@ -154,7 +156,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
if (old.getCheckpointFile.isDefined) {
try {
fs.delete(new Path(old.getCheckpointFile.get), true)
val path = new Path(old.getCheckpointFile.get)
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.mllib.impl

import scala.collection.mutable

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -160,21 +161,23 @@ private[mllib] abstract class PeriodicCheckpointer[T](
private def removeCheckpointFile(): Unit = {
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we manually delete it.
val fs = FileSystem.get(sc.hadoopConfiguration)
getCheckpointFiles(old).foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
getCheckpointFiles(old).foreach(
PeriodicCheckpointer.removeCheckpointFile(_, sc.hadoopConfiguration))
}
}

private[spark] object PeriodicCheckpointer extends Logging {

/** Delete a checkpoint file, and log a warning if deletion fails. */
def removeCheckpointFile(path: String, fs: FileSystem): Unit = {
def removeCheckpointFile(checkpointFile: String, conf: Configuration): Unit = {
try {
fs.delete(new Path(path), true)
val path = new Path(checkpointFile)
val fs = path.getFileSystem(conf)
fs.delete(path, true)
} catch {
case e: Exception =>
logWarning("PeriodicCheckpointer could not remove old checkpoint file: " +
path)
checkpointFile)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.impl

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.graphx.{Edge, Graph}
Expand Down Expand Up @@ -140,9 +140,11 @@ private object PeriodicGraphCheckpointerSuite {
// Instead, we check for the presence of the checkpoint files.
// This test should continue to work even after this graph.isCheckpointed issue
// is fixed (though it can then be simplified and not look for the files).
val fs = FileSystem.get(graph.vertices.sparkContext.hadoopConfiguration)
val hadoopConf = graph.vertices.sparkContext.hadoopConfiguration
graph.getCheckpointFiles.foreach { checkpointFile =>
assert(!fs.exists(new Path(checkpointFile)),
val path = new Path(checkpointFile)
val fs = path.getFileSystem(hadoopConf)
assert(!fs.exists(path),
"Graph checkpoint file should have been removed")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.impl

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.mllib.util.MLlibTestSparkContext
Expand Down Expand Up @@ -127,9 +127,11 @@ private object PeriodicRDDCheckpointerSuite {
// Instead, we check for the presence of the checkpoint files.
// This test should continue to work even after this rdd.isCheckpointed issue
// is fixed (though it can then be simplified and not look for the files).
val fs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
val hadoopConf = rdd.sparkContext.hadoopConfiguration
rdd.getCheckpointFile.foreach { checkpointFile =>
assert(!fs.exists(new Path(checkpointFile)), "RDD checkpoint file should have been removed")
val path = new Path(checkpointFile)
val fs = path.getFileSystem(hadoopConf)
assert(!fs.exists(path), "RDD checkpoint file should have been removed")
}
}

Expand Down

0 comments on commit 6563d72

Please sign in to comment.