Skip to content

Commit 8cb16a6

Browse files
committed
Merge remote-tracking branch 'upstream/master' into ldaonline
2 parents 62405cc + 2bf40c5 commit 8cb16a6

File tree

208 files changed

+1965
-918
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

208 files changed

+1965
-918
lines changed

assembly/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.10</artifactId>
24-
<version>1.3.0-SNAPSHOT</version>
24+
<version>1.4.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.10</artifactId>
24-
<version>1.3.0-SNAPSHOT</version>
24+
<version>1.4.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

core/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.10</artifactId>
24-
<version>1.3.0-SNAPSHOT</version>
24+
<version>1.4.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

@@ -275,7 +275,7 @@
275275
<dependency>
276276
<groupId>org.tachyonproject</groupId>
277277
<artifactId>tachyon-client</artifactId>
278-
<version>0.5.0</version>
278+
<version>0.6.1</version>
279279
<exclusions>
280280
<exclusion>
281281
<groupId>org.apache.hadoop</groupId>

core/src/main/scala/org/apache/spark/TaskState.scala

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ private[spark] object TaskState extends Enumeration {
2727

2828
type TaskState = Value
2929

30+
def isFailed(state: TaskState) = (LOST == state) || (FAILED == state)
31+
3032
def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
3133

3234
def toMesos(state: TaskState): MesosTaskState = state match {

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

+11
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,23 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
101101

102102
/**
103103
* Return a sampled subset of this RDD.
104+
*
105+
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
106+
* @param fraction expected size of the sample as a fraction of this RDD's size
107+
* without replacement: probability that each element is chosen; fraction must be [0, 1]
108+
* with replacement: expected number of times each element is chosen; fraction must be >= 0
104109
*/
105110
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] =
106111
sample(withReplacement, fraction, Utils.random.nextLong)
107112

108113
/**
109114
* Return a sampled subset of this RDD.
115+
*
116+
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
117+
* @param fraction expected size of the sample as a fraction of this RDD's size
118+
* without replacement: probability that each element is chosen; fraction must be [0, 1]
119+
* with replacement: expected number of times each element is chosen; fraction must be >= 0
120+
* @param seed seed for the random number generator
110121
*/
111122
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
112123
wrapRDD(rdd.sample(withReplacement, fraction, seed))

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.json4s.jackson.JsonMethods
3333

3434
import org.apache.spark.{Logging, SparkConf, SparkContext}
3535
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
36+
import org.apache.spark.util.Utils
3637

3738
/**
3839
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
@@ -405,8 +406,7 @@ private object SparkDocker {
405406

406407
private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
407408
val ipPromise = promise[String]()
408-
val outFile = File.createTempFile("fault-tolerance-test", "")
409-
outFile.deleteOnExit()
409+
val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
410410
val outStream: FileWriter = new FileWriter(outFile)
411411
def findIpAndLog(line: String): Unit = {
412412
if (line.startsWith("CONTAINER_IP=")) {

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
233233
} catch {
234234
case e: Exception =>
235235
logError(
236-
s"Exception encountered when attempting to load application log ${fileStatus.getPath}")
236+
s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
237+
e)
237238
None
238239
}
239240
}.toSeq.sortWith(compareAppInfo)

core/src/main/scala/org/apache/spark/package.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,5 @@ package org.apache
4343

4444
package object spark {
4545
// For package docs only
46-
val SPARK_VERSION = "1.3.0-SNAPSHOT"
46+
val SPARK_VERSION = "1.4.0-SNAPSHOT"
4747
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

+6
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,12 @@ abstract class RDD[T: ClassTag](
377377

378378
/**
379379
* Return a sampled subset of this RDD.
380+
*
381+
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
382+
* @param fraction expected size of the sample as a fraction of this RDD's size
383+
* without replacement: probability that each element is chosen; fraction must be [0, 1]
384+
* with replacement: expected number of times each element is chosen; fraction must be >= 0
385+
* @param seed seed for the random number generator
380386
*/
381387
def sample(withReplacement: Boolean,
382388
fraction: Double,

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ private[spark] class CoarseMesosSchedulerBackend(
277277
coresByTaskId -= taskId
278278
}
279279
// If it was a failure, mark the slave as failed for blacklisting purposes
280-
if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
280+
if (TaskState.isFailed(TaskState.fromMesos(state))) {
281281
failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
282282
if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
283283
logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala

+3-7
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,11 @@ import org.apache.spark.SparkContext
2121

2222
private[spark] object MemoryUtils {
2323
// These defaults copied from YARN
24-
val OVERHEAD_FRACTION = 1.10
24+
val OVERHEAD_FRACTION = 0.10
2525
val OVERHEAD_MINIMUM = 384
2626

2727
def calculateTotalMemory(sc: SparkContext) = {
28-
math.max(
29-
sc.conf.getOption("spark.mesos.executor.memoryOverhead")
30-
.getOrElse(OVERHEAD_MINIMUM.toString)
31-
.toInt + sc.executorMemory,
32-
OVERHEAD_FRACTION * sc.executorMemory
33-
)
28+
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
29+
math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory
3430
}
3531
}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,8 @@ private[spark] class MesosSchedulerBackend(
318318
val tid = status.getTaskId.getValue.toLong
319319
val state = TaskState.fromMesos(status.getState)
320320
synchronized {
321-
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
321+
if (TaskState.isFailed(TaskState.fromMesos(status.getState))
322+
&& taskIdToSlaveId.contains(tid)) {
322323
// We lost the executor on this slave, so remember that it's gone
323324
removeExecutor(taskIdToSlaveId(tid), "Lost executor")
324325
}

core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala

+13-14
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ package org.apache.spark.storage
2020
import java.text.SimpleDateFormat
2121
import java.util.{Date, Random}
2222

23-
import tachyon.client.TachyonFS
24-
import tachyon.client.TachyonFile
23+
import tachyon.TachyonURI
24+
import tachyon.client.{TachyonFile, TachyonFS}
2525

2626
import org.apache.spark.Logging
2727
import org.apache.spark.executor.ExecutorExitCode
@@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager(
4040
val master: String)
4141
extends Logging {
4242

43-
val client = if (master != null && master != "") TachyonFS.get(master) else null
43+
val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
4444

4545
if (client == null) {
4646
logError("Failed to connect to the Tachyon as the master address is not configured")
@@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager(
6060
addShutdownHook()
6161

6262
def removeFile(file: TachyonFile): Boolean = {
63-
client.delete(file.getPath(), false)
63+
client.delete(new TachyonURI(file.getPath()), false)
6464
}
6565

6666
def fileExists(file: TachyonFile): Boolean = {
67-
client.exist(file.getPath())
67+
client.exist(new TachyonURI(file.getPath()))
6868
}
6969

7070
def getFile(filename: String): TachyonFile = {
@@ -81,15 +81,15 @@ private[spark] class TachyonBlockManager(
8181
if (old != null) {
8282
old
8383
} else {
84-
val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId)
84+
val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
8585
client.mkdir(path)
8686
val newDir = client.getFile(path)
8787
subDirs(dirId)(subDirId) = newDir
8888
newDir
8989
}
9090
}
9191
}
92-
val filePath = subDir + "/" + filename
92+
val filePath = new TachyonURI(s"$subDir/$filename")
9393
if(!client.exist(filePath)) {
9494
client.createFile(filePath)
9595
}
@@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager(
101101

102102
// TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
103103
private def createTachyonDirs(): Array[TachyonFile] = {
104-
logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
104+
logDebug(s"Creating tachyon directories at root dirs '$rootDirs'")
105105
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
106106
rootDirs.split(",").map { rootDir =>
107107
var foundLocalDir = false
@@ -113,22 +113,21 @@ private[spark] class TachyonBlockManager(
113113
tries += 1
114114
try {
115115
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
116-
val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId
116+
val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
117117
if (!client.exist(path)) {
118118
foundLocalDir = client.mkdir(path)
119119
tachyonDir = client.getFile(path)
120120
}
121121
} catch {
122122
case e: Exception =>
123-
logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
123+
logWarning(s"Attempt $tries to create tachyon dir $tachyonDir failed", e)
124124
}
125125
}
126126
if (!foundLocalDir) {
127-
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
128-
rootDir)
127+
logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir")
129128
System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
130129
}
131-
logInfo("Created tachyon directory at " + tachyonDir)
130+
logInfo(s"Created tachyon directory at $tachyonDir")
132131
tachyonDir
133132
}
134133
}
@@ -145,7 +144,7 @@ private[spark] class TachyonBlockManager(
145144
}
146145
} catch {
147146
case e: Exception =>
148-
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
147+
logError(s"Exception while deleting tachyon spark dir: $tachyonDir", e)
149148
}
150149
}
151150
client.close()

core/src/main/scala/org/apache/spark/util/Utils.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ import org.apache.hadoop.security.UserGroupInformation
4242
import org.apache.log4j.PropertyConfigurator
4343
import org.eclipse.jetty.util.MultiException
4444
import org.json4s._
45+
46+
import tachyon.TachyonURI
4547
import tachyon.client.{TachyonFS, TachyonFile}
4648

4749
import org.apache.spark._
@@ -288,7 +290,7 @@ private[spark] object Utils extends Logging {
288290
} catch { case e: SecurityException => dir = null; }
289291
}
290292

291-
dir
293+
dir.getCanonicalFile
292294
}
293295

294296
/**
@@ -970,7 +972,7 @@ private[spark] object Utils extends Logging {
970972
* Delete a file or directory and its contents recursively.
971973
*/
972974
def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
973-
if (!client.delete(dir.getPath(), true)) {
975+
if (!client.delete(new TachyonURI(dir.getPath()), true)) {
974976
throw new IOException("Failed to delete the tachyon dir: " + dir)
975977
}
976978
}

core/src/test/scala/org/apache/spark/CheckpointSuite.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
3333

3434
override def beforeEach() {
3535
super.beforeEach()
36-
checkpointDir = File.createTempFile("temp", "")
37-
checkpointDir.deleteOnExit()
36+
checkpointDir = File.createTempFile("temp", "", Utils.createTempDir())
3837
checkpointDir.delete()
3938
sc = new SparkContext("local", "test")
4039
sc.setCheckpointDir(checkpointDir.toString)

core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.io.File
2121

2222
import org.scalatest.FunSuite
2323

24+
import org.apache.spark.util.Utils
25+
2426
class SecurityManagerSuite extends FunSuite {
2527

2628
test("set security with conf") {
@@ -160,8 +162,7 @@ class SecurityManagerSuite extends FunSuite {
160162
}
161163

162164
test("ssl off setup") {
163-
val file = File.createTempFile("SSLOptionsSuite", "conf")
164-
file.deleteOnExit()
165+
val file = File.createTempFile("SSLOptionsSuite", "conf", Utils.createTempDir())
165166

166167
System.setProperty("spark.ssl.configFile", file.getAbsolutePath)
167168
val conf = new SparkConf()

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

+6-5
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,14 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
7979
val byteArray2 = converter.convert(bytesWritable)
8080
assert(byteArray2.length === 0)
8181
}
82-
82+
8383
test("addFile works") {
84-
val file1 = File.createTempFile("someprefix1", "somesuffix1")
84+
val dir = Utils.createTempDir()
85+
86+
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
8587
val absolutePath1 = file1.getAbsolutePath
8688

87-
val pluto = Utils.createTempDir()
88-
val file2 = File.createTempFile("someprefix2", "somesuffix2", pluto)
89+
val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
8990
val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
9091
val absolutePath2 = file2.getAbsolutePath
9192

@@ -129,7 +130,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
129130
sc.stop()
130131
}
131132
}
132-
133+
133134
test("addFile recursive works") {
134135
val pluto = Utils.createTempDir()
135136
val neptune = Utils.createTempDir(pluto.getAbsolutePath)

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

+5-3
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,10 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
402402
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
403403
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
404404

405+
val tmpDir = Utils.createTempDir()
406+
405407
// Test jars and files
406-
val f1 = File.createTempFile("test-submit-jars-files", "")
408+
val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
407409
val writer1 = new PrintWriter(f1)
408410
writer1.println("spark.jars " + jars)
409411
writer1.println("spark.files " + files)
@@ -420,7 +422,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
420422
sysProps("spark.files") should be(Utils.resolveURIs(files))
421423

422424
// Test files and archives (Yarn)
423-
val f2 = File.createTempFile("test-submit-files-archives", "")
425+
val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
424426
val writer2 = new PrintWriter(f2)
425427
writer2.println("spark.yarn.dist.files " + files)
426428
writer2.println("spark.yarn.dist.archives " + archives)
@@ -437,7 +439,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
437439
sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
438440

439441
// Test python files
440-
val f3 = File.createTempFile("test-submit-python-files", "")
442+
val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
441443
val writer3 = new PrintWriter(f3)
442444
writer3.println("spark.submit.pyFiles " + pyFiles)
443445
writer3.close()

core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ import java.io.File
2222
import org.apache.hadoop.fs.Path
2323
import org.apache.hadoop.io.{LongWritable, Text}
2424
import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat}
25-
import org.apache.spark._
2625
import org.scalatest.FunSuite
2726

2827
import scala.collection.Map
2928
import scala.language.postfixOps
3029
import scala.sys.process._
3130
import scala.util.Try
3231

32+
import org.apache.spark._
33+
import org.apache.spark.util.Utils
34+
3335
class PipedRDDSuite extends FunSuite with SharedSparkContext {
3436

3537
test("basic pipe") {
@@ -141,7 +143,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
141143
// make sure symlinks were created
142144
assert(pipedLs.length > 0)
143145
// clean up top level tasks directory
144-
new File("tasks").delete()
146+
Utils.deleteRecursively(new File("tasks"))
145147
} else {
146148
assert(true)
147149
}

0 commit comments

Comments
 (0)