Skip to content

Commit e579302

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
2 parents 281589c + f776bc9 commit e579302

File tree

10 files changed

+42
-20
lines changed

10 files changed

+42
-20
lines changed

core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,20 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
3939
*
4040
* @param prev RDD to be sampled
4141
* @param sampler a random sampler
42+
* @param preservesPartitioning whether the sampler preserves the partitioner of the parent RDD
4243
* @param seed random seed
4344
* @tparam T input RDD item type
4445
* @tparam U sampled RDD item type
4546
*/
4647
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
4748
prev: RDD[T],
4849
sampler: RandomSampler[T, U],
50+
@transient preservesPartitioning: Boolean,
4951
@transient seed: Long = Utils.random.nextLong)
5052
extends RDD[U](prev) {
5153

54+
@transient override val partitioner = if (preservesPartitioning) prev.partitioner else None
55+
5256
override def getPartitions: Array[Partition] = {
5357
val random = new Random(seed)
5458
firstParent[T].partitions.map(x => new PartitionwiseSampledRDDPartition(x, random.nextLong()))

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,9 @@ abstract class RDD[T: ClassTag](
356356
seed: Long = Utils.random.nextLong): RDD[T] = {
357357
require(fraction >= 0.0, "Invalid fraction value: " + fraction)
358358
if (withReplacement) {
359-
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), seed)
359+
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
360360
} else {
361-
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), seed)
361+
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
362362
}
363363
}
364364

@@ -374,7 +374,7 @@ abstract class RDD[T: ClassTag](
374374
val sum = weights.sum
375375
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
376376
normalizedCumWeights.sliding(2).map { x =>
377-
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), seed)
377+
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed)
378378
}.toArray
379379
}
380380

@@ -586,6 +586,9 @@ abstract class RDD[T: ClassTag](
586586

587587
/**
588588
* Return a new RDD by applying a function to each partition of this RDD.
589+
*
590+
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
591+
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
589592
*/
590593
def mapPartitions[U: ClassTag](
591594
f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
@@ -596,6 +599,9 @@ abstract class RDD[T: ClassTag](
596599
/**
597600
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
598601
* of the original partition.
602+
*
603+
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
604+
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
599605
*/
600606
def mapPartitionsWithIndex[U: ClassTag](
601607
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
@@ -607,6 +613,9 @@ abstract class RDD[T: ClassTag](
607613
* :: DeveloperApi ::
608614
* Return a new RDD by applying a function to each partition of this RDD. This is a variant of
609615
* mapPartitions that also passes the TaskContext into the closure.
616+
*
617+
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
618+
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
610619
*/
611620
@DeveloperApi
612621
def mapPartitionsWithContext[U: ClassTag](
@@ -689,7 +698,7 @@ abstract class RDD[T: ClassTag](
689698
* a map on the other).
690699
*/
691700
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
692-
zipPartitions(other, true) { (thisIter, otherIter) =>
701+
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
693702
new Iterator[(T, U)] {
694703
def hasNext = (thisIter.hasNext, otherIter.hasNext) match {
695704
case (true, true) => true

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[spark] class LocalActor(
5757
case StatusUpdate(taskId, state, serializedData) =>
5858
scheduler.statusUpdate(taskId, state, serializedData)
5959
if (TaskState.isFinished(state)) {
60-
freeCores += 1
60+
freeCores += scheduler.CPUS_PER_TASK
6161
reviveOffers()
6262
}
6363

@@ -68,7 +68,7 @@ private[spark] class LocalActor(
6868
def reviveOffers() {
6969
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
7070
for (task <- scheduler.resourceOffers(offers).flatten) {
71-
freeCores -= 1
71+
freeCores -= scheduler.CPUS_PER_TASK
7272
executor.launchTask(executorBackend, task.taskId, task.name, task.serializedTask)
7373
}
7474
}

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class ExternalAppendOnlyMap[K, V, C](
106106
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
107107
private val keyComparator = new HashComparator[K]
108108
private val ser = serializer.newInstance()
109+
private val threadId = Thread.currentThread().getId
109110

110111
/**
111112
* Insert the given key and value into the map.
@@ -128,7 +129,6 @@ class ExternalAppendOnlyMap[K, V, C](
128129
// Atomically check whether there is sufficient memory in the global pool for
129130
// this map to grow and, if possible, allocate the required amount
130131
shuffleMemoryMap.synchronized {
131-
val threadId = Thread.currentThread().getId
132132
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
133133
val availableMemory = maxMemoryThreshold -
134134
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
@@ -153,8 +153,8 @@ class ExternalAppendOnlyMap[K, V, C](
153153
*/
154154
private def spill(mapSize: Long) {
155155
spillCount += 1
156-
logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
157-
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
156+
logWarning("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)"
157+
.format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
158158
val (blockId, file) = diskBlockManager.createTempBlock()
159159
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
160160
var objectsWritten = 0

core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class PartitionwiseSampledRDDSuite extends FunSuite with SharedSparkContext {
4343
test("seed distribution") {
4444
val rdd = sc.makeRDD(Array(1L, 2L, 3L, 4L), 2)
4545
val sampler = new MockSampler
46-
val sample = new PartitionwiseSampledRDD[Long, Long](rdd, sampler, 0L)
46+
val sample = new PartitionwiseSampledRDD[Long, Long](rdd, sampler, false, 0L)
4747
assert(sample.distinct().count == 2, "Seeds must be different.")
4848
}
4949

@@ -52,7 +52,7 @@ class PartitionwiseSampledRDDSuite extends FunSuite with SharedSparkContext {
5252
// We want to make sure there are no concurrency issues.
5353
val rdd = sc.parallelize(0 until 111, 10)
5454
for (sampler <- Seq(new BernoulliSampler[Int](0.5), new PoissonSampler[Int](0.5))) {
55-
val sampled = new PartitionwiseSampledRDD[Int, Int](rdd, sampler)
55+
val sampled = new PartitionwiseSampledRDD[Int, Int](rdd, sampler, true)
5656
sampled.zip(sampled).count()
5757
}
5858
}

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,15 @@ class RDDSuite extends FunSuite with SharedSparkContext {
523523
assert(sortedTopK === nums.sorted(ord).take(5))
524524
}
525525

526+
test("sample preserves partitioner") {
527+
val partitioner = new HashPartitioner(2)
528+
val rdd = sc.parallelize(Seq((0, 1), (2, 3))).partitionBy(partitioner)
529+
for (withReplacement <- Seq(true, false)) {
530+
val sampled = rdd.sample(withReplacement, 1.0)
531+
assert(sampled.partitioner === rdd.partitioner)
532+
}
533+
}
534+
526535
test("takeSample") {
527536
val n = 1000000
528537
val data = sc.parallelize(1 to n, 2)

mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,11 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) extends
103103
mergeValue = (c: BinaryLabelCounter, label: Double) => c += label,
104104
mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2
105105
).sortByKey(ascending = false)
106-
val agg = counts.values.mapPartitions({ iter =>
106+
val agg = counts.values.mapPartitions { iter =>
107107
val agg = new BinaryLabelCounter()
108108
iter.foreach(agg += _)
109109
Iterator(agg)
110-
}, preservesPartitioning = true).collect()
110+
}.collect()
111111
val partitionwiseCumulativeCounts =
112112
agg.scanLeft(new BinaryLabelCounter())(
113113
(agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c)

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,18 +377,18 @@ class RowMatrix(
377377
s"Only support dense matrix at this time but found ${B.getClass.getName}.")
378378

379379
val Bb = rows.context.broadcast(B.toBreeze.asInstanceOf[BDM[Double]].toDenseVector.toArray)
380-
val AB = rows.mapPartitions({ iter =>
380+
val AB = rows.mapPartitions { iter =>
381381
val Bi = Bb.value
382-
iter.map(row => {
382+
iter.map { row =>
383383
val v = BDV.zeros[Double](k)
384384
var i = 0
385385
while (i < k) {
386386
v(i) = row.toBreeze.dot(new BDV(Bi, i * n, 1, n))
387387
i += 1
388388
}
389389
Vectors.fromBreeze(v)
390-
})
391-
}, preservesPartitioning = true)
390+
}
391+
}
392392

393393
new RowMatrix(AB, nRows, B.numCols)
394394
}

mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ class ALS private (
430430
val inLinkBlock = makeInLinkBlock(numProductBlocks, ratings, productPartitioner)
431431
val outLinkBlock = makeOutLinkBlock(numProductBlocks, ratings, productPartitioner)
432432
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
433-
}, true)
433+
}, preservesPartitioning = true)
434434
val inLinks = links.mapValues(_._1)
435435
val outLinks = links.mapValues(_._2)
436436
inLinks.persist(StorageLevel.MEMORY_AND_DISK)

mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,8 @@ object MLUtils {
264264
(1 to numFolds).map { fold =>
265265
val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF,
266266
complement = false)
267-
val validation = new PartitionwiseSampledRDD(rdd, sampler, seed)
268-
val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed)
267+
val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed)
268+
val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), true, seed)
269269
(training, validation)
270270
}.toArray
271271
}

0 commit comments

Comments
 (0)