Skip to content

Commit 35c6636

Browse files
committed
Addressed comments
1 parent 1b23492 commit 35c6636

File tree

4 files changed

+15
-23
lines changed

4 files changed

+15
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
*/
1717
package org.apache.spark.sql.execution.streaming
1818

19-
import java.io.{FileSystem => _, _}
19+
import java.io.{FileNotFoundException, IOException, OutputStream}
2020
import java.util.{EnumSet, UUID}
2121

2222
import scala.util.control.NonFatal
2323

24-
import org.apache.commons.io.IOUtils
2524
import org.apache.hadoop.conf.Configuration
2625
import org.apache.hadoop.fs._
2726
import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
@@ -228,7 +227,8 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
228227
}
229228

230229
override def createAtomic(
231-
path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
230+
path: Path,
231+
overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
232232
new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
233233
}
234234

@@ -311,7 +311,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
311311
}
312312

313313
override def createAtomic(
314-
path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
314+
path: Path,
315+
overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
315316
new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
316317
}
317318

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession {
109109
}
110110

111111
test("CheckpointFileManager.create() should fallback from FileContext to FileSystem") {
112-
import FakeFileSystem.scheme
113-
spark.conf.set(s"fs.$scheme.impl", classOf[FakeFileSystem].getName)
112+
import CheckpointFileManagerSuiteFileSystem.scheme
113+
spark.conf.set(s"fs.$scheme.impl", classOf[CheckpointFileManagerSuiteFileSystem].getName)
114114
quietly {
115115
withTempDir { temp =>
116116
val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
@@ -177,15 +177,18 @@ object TestCheckpointFileManager {
177177
}
178178

179179

180-
/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */
181-
private class FakeFileSystem extends RawLocalFileSystem {
182-
import FakeFileSystem.scheme
180+
/**
181+
* CheckpointFileManagerSuiteFileSystem to test fallback of the CheckpointFileManager
182+
* from FileContext to FileSystem API.
183+
*/
184+
private class CheckpointFileManagerSuiteFileSystem extends RawLocalFileSystem {
185+
import CheckpointFileManagerSuiteFileSystem.scheme
183186

184187
override def getUri: URI = {
185188
URI.create(s"$scheme:///")
186189
}
187190
}
188191

189-
private object FakeFileSystem {
190-
val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}"
192+
private object CheckpointFileManagerSuiteFileSystem {
193+
val scheme = s"CheckpointFileManagerSuiteFileSystem${math.abs(Random.nextInt)}"
191194
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,6 @@ import org.apache.spark.sql.test.SharedSQLContext
2727

2828
class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext {
2929

30-
/** To avoid caching of FS objects */
31-
override protected def sparkConf =
32-
super.sparkConf.set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
33-
3430
import CompactibleFileStreamLog._
3531

3632
/** -- testing of `object CompactibleFileStreamLog` begins -- */

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,19 @@
1818
package org.apache.spark.sql.execution.streaming
1919

2020
import java.io.File
21-
import java.net.URI
2221
import java.util.ConcurrentModificationException
2322

2423
import scala.language.implicitConversions
25-
import scala.util.Random
2624

27-
import org.apache.hadoop.fs._
2825
import org.scalatest.concurrent.Waiters._
2926
import org.scalatest.time.SpanSugar._
3027

3128
import org.apache.spark.SparkFunSuite
32-
import org.apache.spark.sql.execution.streaming.FakeFileSystem._
3329
import org.apache.spark.sql.test.SharedSQLContext
3430
import org.apache.spark.util.UninterruptibleThread
3531

3632
class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
3733

38-
/** To avoid caching of FS objects */
39-
override protected def sparkConf =
40-
super.sparkConf.set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
41-
4234
private implicit def toOption[A](a: A): Option[A] = Option(a)
4335

4436
test("HDFSMetadataLog: basic") {

0 commit comments

Comments
 (0)