Skip to content

Commit 71d4ed2

Browse files
srowenrxin
authored andcommitted
SPARK-1316. Remove use of Commons IO
(This follows from a side point on SPARK-1133, in discussion of the PR: #164 ) Commons IO is barely used in the project, and can easily be replaced with equivalent calls to Guava or the existing Spark `Utils.scala` class. Removing a dependency feels good, and this one in particular can get a little problematic since Hadoop uses it too. Author: Sean Owen <sowen@cloudera.com> Closes #226 from srowen/SPARK-1316 and squashes the following commits: 21efef3 [Sean Owen] Remove use of Commons IO
1 parent 134ace7 commit 71d4ed2

File tree

10 files changed

+34
-54
lines changed

10 files changed

+34
-54
lines changed

core/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,6 @@
200200
<artifactId>derby</artifactId>
201201
<scope>test</scope>
202202
</dependency>
203-
<dependency>
204-
<groupId>commons-io</groupId>
205-
<artifactId>commons-io</artifactId>
206-
<scope>test</scope>
207-
</dependency>
208203
<dependency>
209204
<groupId>org.scalatest</groupId>
210205
<artifactId>scalatest_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,10 @@ private[spark] object Utils extends Logging {
529529
}
530530
}
531531
if (!file.delete()) {
532-
throw new IOException("Failed to delete: " + file)
532+
// Delete can also fail if the file simply did not exist
533+
if (file.exists()) {
534+
throw new IOException("Failed to delete: " + file.getAbsolutePath)
535+
}
533536
}
534537
}
535538

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import java.nio.{ByteBuffer, ByteOrder}
2424

2525
import com.google.common.base.Charsets
2626
import com.google.common.io.Files
27-
import org.apache.commons.io.FileUtils
2827
import org.scalatest.FunSuite
2928

3029
class UtilsSuite extends FunSuite {
@@ -136,7 +135,7 @@ class UtilsSuite extends FunSuite {
136135
// Read some nonexistent bytes on both ends
137136
assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
138137

139-
FileUtils.deleteDirectory(tmpDir2)
138+
Utils.deleteRecursively(tmpDir2)
140139
}
141140

142141
test("deserialize long value") {

pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -435,11 +435,6 @@
435435
<version>1.9.1</version>
436436
<scope>test</scope>
437437
</dependency>
438-
<dependency>
439-
<groupId>commons-io</groupId>
440-
<artifactId>commons-io</artifactId>
441-
<version>2.4</version>
442-
</dependency>
443438
<dependency>
444439
<groupId>org.easymock</groupId>
445440
<artifactId>easymock</artifactId>

project/SparkBuild.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,7 @@ object SparkBuild extends Build {
259259
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
260260
"com.novocode" % "junit-interface" % "0.10" % "test",
261261
"org.easymock" % "easymock" % "3.1" % "test",
262-
"org.mockito" % "mockito-all" % "1.8.5" % "test",
263-
"commons-io" % "commons-io" % "2.4" % "test"
262+
"org.mockito" % "mockito-all" % "1.8.5" % "test"
264263
),
265264

266265
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
@@ -439,10 +438,7 @@ object SparkBuild extends Build {
439438

440439
def streamingSettings = sharedSettings ++ Seq(
441440
name := "spark-streaming",
442-
previousArtifact := sparkPreviousArtifact("spark-streaming"),
443-
libraryDependencies ++= Seq(
444-
"commons-io" % "commons-io" % "2.4"
445-
)
441+
previousArtifact := sparkPreviousArtifact("spark-streaming")
446442
)
447443

448444
def yarnCommonSettings = sharedSettings ++ Seq(

streaming/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,6 @@
7474
<artifactId>junit-interface</artifactId>
7575
<scope>test</scope>
7676
</dependency>
77-
<dependency>
78-
<groupId>commons-io</groupId>
79-
<artifactId>commons-io</artifactId>
80-
</dependency>
8177
</dependencies>
8278
<build>
8379
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
2828
import scala.reflect.ClassTag
2929

3030
import java.io.{File, ObjectInputStream, IOException}
31+
import java.nio.charset.Charset
3132
import java.util.UUID
3233

3334
import com.google.common.io.Files
3435

35-
import org.apache.commons.io.FileUtils
36-
import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
36+
import org.apache.hadoop.fs.Path
3737
import org.apache.hadoop.conf.Configuration
3838

3939

@@ -389,7 +389,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
389389
val localFile = new File(localTestDir, (i + 1).toString)
390390
val hadoopFile = new Path(testDir, (i + 1).toString)
391391
val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
392-
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
392+
Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8"))
393393
var tries = 0
394394
var done = false
395395
while (!done && tries < maxTries) {

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,17 @@
1818
package org.apache.spark.streaming
1919

2020
import java.io.File
21+
import java.nio.charset.Charset
2122

2223
import scala.collection.mutable.ArrayBuffer
2324
import scala.reflect.ClassTag
24-
import org.apache.commons.io.FileUtils
2525
import com.google.common.io.Files
2626
import org.apache.hadoop.fs.{Path, FileSystem}
2727
import org.apache.hadoop.conf.Configuration
2828
import org.apache.spark.streaming.StreamingContext._
2929
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
3030
import org.apache.spark.streaming.util.ManualClock
3131
import org.apache.spark.util.Utils
32-
import org.apache.spark.SparkConf
3332

3433
/**
3534
* This test suites tests the checkpointing functionality of DStreams -
@@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase {
4645

4746
override def beforeFunction() {
4847
super.beforeFunction()
49-
FileUtils.deleteDirectory(new File(checkpointDir))
48+
Utils.deleteRecursively(new File(checkpointDir))
5049
}
5150

5251
override def afterFunction() {
5352
super.afterFunction()
5453
if (ssc != null) ssc.stop()
55-
FileUtils.deleteDirectory(new File(checkpointDir))
54+
Utils.deleteRecursively(new File(checkpointDir))
5655
}
5756

5857
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
@@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase {
256255
//var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
257256
Thread.sleep(1000)
258257
for (i <- Seq(1, 2, 3)) {
259-
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
258+
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
260259
// wait to make sure that the file is written such that it gets shown in the file listings
261260
Thread.sleep(1000)
262261
}
@@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase {
273272

274273
// Create files while the master is down
275274
for (i <- Seq(4, 5, 6)) {
276-
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
275+
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
277276
Thread.sleep(1000)
278277
}
279278

@@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase {
289288
// Restart stream computation
290289
ssc.start()
291290
for (i <- Seq(7, 8, 9)) {
292-
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
291+
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
293292
Thread.sleep(1000)
294293
}
295294
Thread.sleep(1000)

streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,9 @@ package org.apache.spark.streaming
1919

2020
import org.apache.spark.Logging
2121
import org.apache.spark.streaming.util.MasterFailureTest
22-
import StreamingContext._
22+
import org.apache.spark.util.Utils
2323

24-
import org.scalatest.{FunSuite, BeforeAndAfter}
25-
import com.google.common.io.Files
2624
import java.io.File
27-
import org.apache.commons.io.FileUtils
28-
import collection.mutable.ArrayBuffer
29-
3025

3126
/**
3227
* This testsuite tests master failures at random times while the stream is running using
@@ -43,12 +38,12 @@ class FailureSuite extends TestSuiteBase with Logging {
4338

4439
override def beforeFunction() {
4540
super.beforeFunction()
46-
FileUtils.deleteDirectory(new File(directory))
41+
Utils.deleteRecursively(new File(directory))
4742
}
4843

4944
override def afterFunction() {
5045
super.afterFunction()
51-
FileUtils.deleteDirectory(new File(directory))
46+
Utils.deleteRecursively(new File(directory))
5247
}
5348

5449
test("multiple failures with map") {

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,23 @@ import akka.actor.IOManager
2323
import akka.actor.Props
2424
import akka.util.ByteString
2525

26-
import org.apache.spark.streaming.dstream.{NetworkReceiver}
27-
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
2826
import java.io.{File, BufferedWriter, OutputStreamWriter}
27+
import java.net.{InetSocketAddress, SocketException, ServerSocket}
28+
import java.nio.charset.Charset
2929
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
30-
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
31-
import util.ManualClock
30+
import java.util.concurrent.atomic.AtomicInteger
31+
32+
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
33+
34+
import com.google.common.io.Files
35+
import org.scalatest.BeforeAndAfter
36+
37+
import org.apache.spark.Logging
3238
import org.apache.spark.storage.StorageLevel
39+
import org.apache.spark.streaming.dstream.NetworkReceiver
3340
import org.apache.spark.streaming.receivers.Receiver
34-
import org.apache.spark.Logging
35-
import scala.util.Random
36-
import org.apache.commons.io.FileUtils
37-
import org.scalatest.BeforeAndAfter
38-
import collection.JavaConversions._
39-
import com.google.common.io.Files
40-
import java.util.concurrent.atomic.AtomicInteger
41+
import org.apache.spark.streaming.util.ManualClock
42+
import org.apache.spark.util.Utils
4143

4244
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
4345

@@ -112,7 +114,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
112114
Thread.sleep(1000)
113115
for (i <- 0 until input.size) {
114116
val file = new File(testDir, i.toString)
115-
FileUtils.writeStringToFile(file, input(i).toString + "\n")
117+
Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
116118
logInfo("Created file " + file)
117119
Thread.sleep(batchDuration.milliseconds)
118120
Thread.sleep(1000)
@@ -136,7 +138,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
136138
// (whether the elements were received one in each interval is not verified)
137139
assert(output.toList === expectedOutput.toList)
138140

139-
FileUtils.deleteDirectory(testDir)
141+
Utils.deleteRecursively(testDir)
140142

141143
// Enable manual clock back again for other tests
142144
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")

0 commit comments

Comments
 (0)