Skip to content

Commit 32173b7

Browse files
committed
Stats examples update.
Split RandomAndSampledRDDs into RandomRDDGeneration and SampledRDDs. (The name RandomRDDGeneration is to avoid a naming conflict with RandomRDDs.) RandomRDDGeneration prints first 5 samples Did same split for Python: random_rdd_generation.py and sampled_rdds.py Other small updates based on code review.
1 parent c8c20dc commit 32173b7

File tree

7 files changed

+291
-116
lines changed

7 files changed

+291
-116
lines changed

examples/src/main/python/mllib/correlations.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@
3939
corrType = 'pearson'
4040

4141
points = MLUtils.loadLibSVMFile(sc, filepath)\
42-
.map(lambda lp: LabeledPoint(lp.label, lp.features.toDense()))
42+
.map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))
4343

44-
print ''
44+
print
4545
print 'Summary of data file: ' + filepath
4646
print '%d data points' % points.count()
4747

4848
# Statistics (correlations)
49-
print ''
49+
print
5050
print 'Correlation (%s) between label and each feature' % corrType
5151
print 'Feature\tCorrelation'
5252
numFeatures = points.take(1)[0].features.size
@@ -55,6 +55,6 @@
5555
featureRDD = points.map(lambda lp: lp.features[i])
5656
corr = Statistics.corr(labelRDD, featureRDD, corrType)
5757
print '%d\t%g' % (i, corr)
58-
print ''
58+
print
5959

6060
sc.stop()
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Randomly generated RDDs.
20+
"""
21+
22+
import sys
23+
24+
from pyspark import SparkContext
25+
from pyspark.mllib.random import RandomRDDs
26+
27+
28+
if __name__ == "__main__":
29+
if len(sys.argv) not in [1, 2]:
30+
print >> sys.stderr, "Usage: random_rdd_generation"
31+
exit(-1)
32+
33+
sc = SparkContext(appName="PythonRandomRDDGeneration")
34+
35+
numExamples = 10000 # number of examples to generate
36+
fraction = 0.1 # fraction of data to sample
37+
38+
# Example: RandomRDDs.normalRDD
39+
normalRDD = RandomRDDs.normalRDD(sc, numExamples)
40+
print 'Generated RDD of %d examples sampled from the standard normal distribution'\
41+
% normalRDD.count()
42+
print ' First 5 samples:'
43+
for sample in normalRDD.take(5):
44+
print ' ' + str(sample)
45+
print
46+
47+
# Example: RandomRDDs.normalVectorRDD
48+
normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2)
49+
print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
50+
print ' First 5 samples:'
51+
for sample in normalVectorRDD.take(5):
52+
print ' ' + str(sample)
53+
print
54+
55+
sc.stop()

examples/src/main/python/mllib/random_and_sampled_rdds.py renamed to examples/src/main/python/mllib/sampled_rdds.py

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,61 +16,49 @@
1616
#
1717

1818
"""
19-
Randomly generated and sampled RDDs.
19+
Randomly sampled RDDs.
2020
"""
2121

2222
import sys
2323

2424
from pyspark import SparkContext
25-
from pyspark.mllib.random import RandomRDDGenerators
2625
from pyspark.mllib.util import MLUtils
2726

2827

29-
3028
if __name__ == "__main__":
3129
if len(sys.argv) not in [1, 2]:
32-
print >> sys.stderr, "Usage: random_and_sampled_rdds <libsvm data file>"
30+
print >> sys.stderr, "Usage: sampled_rdds <libsvm data file>"
3331
exit(-1)
3432
if len(sys.argv) == 2:
3533
datapath = sys.argv[1]
3634
else:
3735
datapath = 'data/mllib/sample_binary_classification_data.txt'
3836

39-
sc = SparkContext(appName="PythonRandomAndSampledRDDs")
40-
41-
points = MLUtils.loadLibSVMFile(sc, datapath)
37+
sc = SparkContext(appName="PythonSampledRDDs")
4238

43-
numExamples = 10000 # number of examples to generate
4439
fraction = 0.1 # fraction of data to sample
4540

46-
# Example: RandomRDDGenerators
47-
normalRDD = RandomRDDGenerators.normalRDD(sc, numExamples)
48-
print 'Generated RDD of %d examples sampled from the standard normal distribution'\
49-
% normalRDD.count()
50-
normalVectorRDD = RandomRDDGenerators.normalVectorRDD(sc, numRows = numExamples, numCols = 2)
51-
print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
52-
53-
print
41+
examples = MLUtils.loadLibSVMFile(sc, datapath)
42+
numExamples = examples.count()
43+
print 'Loaded data with %d examples from file: %s' % (numExamples, datapath)
5444

5545
# Example: RDD.sample() and RDD.takeSample()
5646
expectedSampleSize = int(numExamples * fraction)
5747
print 'Sampling RDD using fraction %g. Expected sample size = %d.' \
5848
% (fraction, expectedSampleSize)
59-
sampledRDD = normalRDD.sample(withReplacement = True, fraction = fraction)
49+
sampledRDD = examples.sample(withReplacement = True, fraction = fraction)
6050
print ' RDD.sample(): sample has %d examples' % sampledRDD.count()
61-
sampledArray = normalRDD.takeSample(withReplacement = True, num = expectedSampleSize)
51+
sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize)
6252
print ' RDD.takeSample(): sample has %d examples' % len(sampledArray)
6353

6454
print
6555

6656
# Example: RDD.sampleByKey()
67-
examples = MLUtils.loadLibSVMFile(sc, datapath)
68-
sizeA = examples.count()
69-
print 'Loaded data with %d examples from file: %s' % (sizeA, datapath)
7057
keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
7158
print ' Keyed data using label (Int) as key ==> Orig'
7259
# Count examples per label in original data.
7360
keyCountsA = keyedRDD.countByKey()
61+
7462
# Subsample, and count examples per label in sampled data.
7563
fractions = {}
7664
for k in keyCountsA.keys():
@@ -80,9 +68,11 @@
8068
sizeB = sum(keyCountsB.values())
8169
print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \
8270
% sizeB
71+
72+
# Compare samples
8373
print ' \tFractions of examples with key'
8474
print 'Key\tOrig\tSample'
8575
for k in sorted(keyCountsA.keys()):
86-
print '%d\t%g\t%g' % (k, keyCountsA[k] / float(sizeA), keyCountsB[k] / float(sizeB))
76+
print '%d\t%g\t%g' % (k, keyCountsA[k] / float(numExamples), keyCountsB[k] / float(sizeB))
8777

8878
sc.stop()
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.mllib
19+
20+
import scopt.OptionParser
21+
22+
import org.apache.spark.mllib.stat.Statistics
23+
import org.apache.spark.mllib.util.MLUtils
24+
import org.apache.spark.{SparkConf, SparkContext}
25+
26+
27+
/**
28+
* An example app for summarizing multivariate data from a file. Run with
29+
* {{{
30+
* bin/run-example org.apache.spark.examples.mllib.Correlations
31+
* }}}
32+
* By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`.
33+
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
34+
*/
35+
object Correlations {
36+
37+
case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
38+
39+
def main(args: Array[String]) {
40+
41+
val defaultParams = Params()
42+
43+
val parser = new OptionParser[Params]("Correlations") {
44+
head("Correlations: an example app for computing correlations")
45+
opt[String]("input")
46+
.text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
47+
.action((x, c) => c.copy(input = x))
48+
note(
49+
"""
50+
|For example, the following command runs this app on a synthetic dataset:
51+
|
52+
| bin/spark-submit --class org.apache.spark.examples.mllib.Correlations \
53+
| examples/target/scala-*/spark-examples-*.jar \
54+
| --input data/mllib/sample_linear_regression_data.txt
55+
""".stripMargin)
56+
}
57+
58+
parser.parse(args, defaultParams).map { params =>
59+
run(params)
60+
} getOrElse {
61+
sys.exit(1)
62+
}
63+
}
64+
65+
def run(params: Params) {
66+
val conf = new SparkConf().setAppName(s"Correlations with $params")
67+
val sc = new SparkContext(conf)
68+
69+
val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
70+
71+
println(s"Summary of data file: ${params.input}")
72+
println(s"${examples.count()} data points")
73+
74+
// Calculate label -- feature correlations
75+
val labelRDD = examples.map(_.label)
76+
val numFeatures = examples.take(1)(0).features.size
77+
val corrType = "pearson"
78+
println()
79+
println(s"Correlation ($corrType) between label and each feature")
80+
println(s"Feature\tCorrelation")
81+
var feature = 0
82+
while (feature < numFeatures) {
83+
val featureRDD = examples.map(_.features(feature))
84+
val corr = Statistics.corr(labelRDD, featureRDD)
85+
println(s"$feature\t$corr")
86+
feature += 1
87+
}
88+
println()
89+
90+
sc.stop()
91+
}
92+
}

examples/src/main/scala/org/apache/spark/examples/mllib/StatisticalSummary.scala renamed to examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala

Lines changed: 30 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,53 +17,61 @@
1717

1818
package org.apache.spark.examples.mllib
1919

20-
import org.apache.spark.mllib.regression.LabeledPoint
21-
import org.apache.spark.rdd.RDD
2220
import scopt.OptionParser
2321

2422
import org.apache.spark.mllib.linalg.Vectors
25-
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, Statistics}
23+
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
2624
import org.apache.spark.mllib.util.MLUtils
2725
import org.apache.spark.{SparkConf, SparkContext}
2826

2927

3028
/**
3129
* An example app for summarizing multivariate data from a file. Run with
3230
* {{{
33-
* bin/run-example org.apache.spark.examples.mllib.Statistics
31+
* bin/run-example org.apache.spark.examples.mllib.MultivariateSummarizer
3432
* }}}
3533
* By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`.
3634
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
3735
*/
38-
object StatisticalSummary extends App {
36+
object MultivariateSummarizer {
3937

4038
case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
4139

42-
val defaultParams = Params()
40+
def main(args: Array[String]) {
4341

44-
val parser = new OptionParser[Params]("StatisticalSummary") {
45-
head("StatisticalSummary: an example app for MultivariateOnlineSummarizer and Statistics" +
46-
" (correlation)")
47-
opt[String]("input")
48-
.text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
49-
.action((x, c) => c.copy(input = x))
50-
note(
51-
"""
42+
val defaultParams = Params()
43+
44+
val parser = new OptionParser[Params]("MultivariateSummarizer") {
45+
head("MultivariateSummarizer: an example app for MultivariateOnlineSummarizer")
46+
opt[String]("input")
47+
.text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
48+
.action((x, c) => c.copy(input = x))
49+
note(
50+
"""
5251
|For example, the following command runs this app on a synthetic dataset:
5352
|
54-
| bin/spark-submit --class org.apache.spark.examples.mllib.StatisticalSummary \
53+
| bin/spark-submit --class org.apache.spark.examples.mllib.MultivariateSummarizer \
5554
| examples/target/scala-*/spark-examples-*.jar \
5655
| --input data/mllib/sample_linear_regression_data.txt
57-
""".stripMargin)
58-
}
56+
""".stripMargin)
57+
}
5958

60-
parser.parse(args, defaultParams).map { params =>
61-
run(params)
62-
} getOrElse {
63-
sys.exit(1)
59+
parser.parse(args, defaultParams).map { params =>
60+
run(params)
61+
} getOrElse {
62+
sys.exit(1)
63+
}
6464
}
6565

66-
def runStatisticalSummary(examples: RDD[LabeledPoint], params: Params) {
66+
def run(params: Params) {
67+
val conf = new SparkConf().setAppName(s"MultivariateSummarizer with $params")
68+
val sc = new SparkContext(conf)
69+
70+
val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
71+
72+
println(s"Summary of data file: ${params.input}")
73+
println(s"${examples.count()} data points")
74+
6775
// Summarize labels
6876
val labelSummary = examples.aggregate(new MultivariateOnlineSummarizer())(
6977
(summary, lp) => summary.add(Vectors.dense(lp.label)),
@@ -84,38 +92,6 @@ object StatisticalSummary extends App {
8492
println(s"max\t${labelSummary.max(0)}\t${featureSummary.max.toArray.mkString("\t")}")
8593
println(s"min\t${labelSummary.min(0)}\t${featureSummary.min.toArray.mkString("\t")}")
8694
println()
87-
}
88-
89-
def runCorrelations(examples: RDD[LabeledPoint], params: Params) {
90-
// Calculate label -- feature correlations
91-
val labelRDD = examples.map(_.label)
92-
val numFeatures = examples.take(1)(0).features.size
93-
val corrType = "pearson"
94-
println()
95-
println(s"Correlation ($corrType) between label and each feature")
96-
println(s"Feature\tCorrelation")
97-
var feature = 0
98-
while (feature < numFeatures) {
99-
val featureRDD = examples.map(_.features(feature))
100-
val corr = Statistics.corr(labelRDD, featureRDD)
101-
println(s"$feature\t$corr")
102-
feature += 1
103-
}
104-
println()
105-
}
106-
107-
def run(params: Params) {
108-
val conf = new SparkConf().setAppName(s"StatisticalSummary with $params")
109-
val sc = new SparkContext(conf)
110-
111-
val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
112-
113-
println(s"Summary of data file: ${params.input}")
114-
println(s"${examples.count} data points")
115-
116-
runStatisticalSummary(examples, params)
117-
118-
runCorrelations(examples, params)
11995

12096
sc.stop()
12197
}

0 commit comments

Comments
 (0)