Skip to content

Commit 065ebcd

Browse files
committed
Merge branch 'master' into takeSample
2 parents 9bdd36e + 6cf335d commit 065ebcd

File tree

175 files changed

+2392
-1105
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

175 files changed

+2392
-1105
lines changed

assembly/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

-2
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
3838
sc.stop()
3939
sc = null
4040
}
41-
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
42-
System.clearProperty("spark.driver.port")
4341
}
4442

4543
test("halting by voting") {

bin/run-example

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
5151
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
5252
fi
5353

54-
./bin/spark-submit \
54+
"$FWDIR"/bin/spark-submit \
5555
--master $EXAMPLE_MASTER \
5656
--class $EXAMPLE_CLASS \
5757
"$SPARK_EXAMPLES_JAR" \

core/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

@@ -240,7 +240,7 @@
240240
</dependency>
241241
<dependency>
242242
<groupId>org.easymock</groupId>
243-
<artifactId>easymock</artifactId>
243+
<artifactId>easymockclassextension</artifactId>
244244
<scope>test</scope>
245245
</dependency>
246246
<dependency>

core/src/main/scala/org/apache/spark/Partitioner.scala

+19-1
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,17 @@ class HashPartitioner(partitions: Int) extends Partitioner {
8383
case _ =>
8484
false
8585
}
86+
87+
override def hashCode: Int = numPartitions
8688
}
8789

8890
/**
8991
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
9092
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
93+
*
94+
* Note that the actual number of partitions created by the RangePartitioner might not be the same
95+
* as the `partitions` parameter, in the case where the number of sampled records is less than
96+
* the value of `partitions`.
9197
*/
9298
class RangePartitioner[K : Ordering : ClassTag, V](
9399
partitions: Int,
@@ -119,7 +125,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
119125
}
120126
}
121127

122-
def numPartitions = partitions
128+
def numPartitions = rangeBounds.length + 1
123129

124130
private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
125131

@@ -155,4 +161,16 @@ class RangePartitioner[K : Ordering : ClassTag, V](
155161
case _ =>
156162
false
157163
}
164+
165+
override def hashCode(): Int = {
166+
val prime = 31
167+
var result = 1
168+
var i = 0
169+
while (i < rangeBounds.length) {
170+
result = prime * result + rangeBounds(i).hashCode
171+
i += 1
172+
}
173+
result = prime * result + ascending.hashCode
174+
result
175+
}
158176
}

core/src/main/scala/org/apache/spark/SparkContext.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
455455
*/
456456
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
457457
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
458-
minPartitions).map(pair => pair._2.toString)
458+
minPartitions).map(pair => pair._2.toString).setName(path)
459459
}
460460

461461
/**
@@ -496,7 +496,7 @@ class SparkContext(config: SparkConf) extends Logging {
496496
classOf[String],
497497
classOf[String],
498498
updateConf,
499-
minPartitions)
499+
minPartitions).setName(path)
500500
}
501501

502502
/**
@@ -551,7 +551,7 @@ class SparkContext(config: SparkConf) extends Logging {
551551
inputFormatClass,
552552
keyClass,
553553
valueClass,
554-
minPartitions)
554+
minPartitions).setName(path)
555555
}
556556

557557
/**
@@ -623,7 +623,7 @@ class SparkContext(config: SparkConf) extends Logging {
623623
val job = new NewHadoopJob(conf)
624624
NewFileInputFormat.addInputPath(job, new Path(path))
625625
val updatedConf = job.getConfiguration
626-
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
626+
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
627627
}
628628

629629
/**

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

+30-21
Original file line numberDiff line numberDiff line change
@@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
672672

673673
/**
674674
* Return approximate number of distinct values for each key in this RDD.
675-
* The accuracy of approximation can be controlled through the relative standard deviation
676-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
677-
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
678-
* Partitioner to partition the output RDD.
675+
*
676+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
677+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
678+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
679+
*
680+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
681+
* It must be greater than 0.000017.
682+
* @param partitioner partitioner of the resulting RDD.
679683
*/
680-
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
681-
rdd.countApproxDistinctByKey(relativeSD, partitioner)
684+
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
685+
{
686+
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
682687
}
683688

684689
/**
685-
* Return approximate number of distinct values for each key this RDD.
686-
* The accuracy of approximation can be controlled through the relative standard deviation
687-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
688-
* more accurate counts but increase the memory footprint and vise versa. The default value of
689-
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
690-
* level.
690+
* Return approximate number of distinct values for each key in this RDD.
691+
*
692+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
693+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
694+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
695+
*
696+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
697+
* It must be greater than 0.000017.
698+
* @param numPartitions number of partitions of the resulting RDD.
691699
*/
692-
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
693-
rdd.countApproxDistinctByKey(relativeSD)
700+
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
701+
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
694702
}
695703

696-
697704
/**
698705
* Return approximate number of distinct values for each key in this RDD.
699-
* The accuracy of approximation can be controlled through the relative standard deviation
700-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
701-
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
702-
* output RDD into numPartitions.
703706
*
707+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
708+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
709+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
710+
*
711+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
712+
* It must be greater than 0.000017.
704713
*/
705-
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
706-
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
714+
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
715+
fromRDD(rdd.countApproxDistinctByKey(relativeSD))
707716
}
708717

709718
/** Assign a name to this RDD */

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

+22
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,28 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
108108
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
109109
wrapRDD(rdd.sample(withReplacement, fraction, seed))
110110

111+
112+
/**
113+
* Randomly splits this RDD with the provided weights.
114+
*
115+
* @param weights weights for splits, will be normalized if they don't sum to 1
116+
*
117+
* @return split RDDs in an array
118+
*/
119+
def randomSplit(weights: Array[Double]): Array[JavaRDD[T]] =
120+
randomSplit(weights, Utils.random.nextLong)
121+
122+
/**
123+
* Randomly splits this RDD with the provided weights.
124+
*
125+
* @param weights weights for splits, will be normalized if they don't sum to 1
126+
* @param seed random seed
127+
*
128+
* @return split RDDs in an array
129+
*/
130+
def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]] =
131+
rdd.randomSplit(weights, seed).map(wrapRDD)
132+
111133
/**
112134
* Return the union of this RDD and another one. Any identical elements will appear multiple
113135
* times (use `.distinct()` to eliminate them).

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

+7-5
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
560560
/**
561561
* Return approximate number of distinct elements in the RDD.
562562
*
563-
* The accuracy of approximation can be controlled through the relative standard deviation
564-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
565-
* more accurate counts but increase the memory footprint and vise versa. The default value of
566-
* relativeSD is 0.05.
563+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
564+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
565+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
566+
*
567+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
568+
* It must be greater than 0.000017.
567569
*/
568-
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
570+
def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)
569571

570572
def name(): String = rdd.name
571573

core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala

+2
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,6 @@ private[spark] class PythonPartitioner(
5050
case _ =>
5151
false
5252
}
53+
54+
override def hashCode: Int = 31 * numPartitions + pyPartitionFunctionId.hashCode
5355
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

+3
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
360360
|
361361
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
362362
|
363+
| --help, -h Show this help message and exit
364+
| --verbose, -v Print additional debug output
365+
|
363366
| Spark standalone with cluster deploy mode only:
364367
| --driver-cores NUM Cores for driver (Default: 1).
365368
| --supervise If given, restarts the driver on failure.

0 commit comments

Comments
 (0)