Skip to content

Commit 8c37f0a

Browse files
committed
Made MLlib and examples compile
1 parent 6d53134 commit 8c37f0a

File tree

28 files changed

+508
-113
lines changed

28 files changed

+508
-113
lines changed

examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.spark.ml.tuning.CrossValidator;
3434
import org.apache.spark.ml.tuning.CrossValidatorModel;
3535
import org.apache.spark.ml.tuning.ParamGridBuilder;
36-
import org.apache.spark.sql.SchemaRDD;
36+
import org.apache.spark.sql.DataFrame;
3737
import org.apache.spark.sql.SQLContext;
3838
import org.apache.spark.sql.Row;
3939

@@ -71,7 +71,7 @@ public static void main(String[] args) {
7171
new LabeledDocument(9L, "a e c l", 0.0),
7272
new LabeledDocument(10L, "spark compile", 1.0),
7373
new LabeledDocument(11L, "hadoop software", 0.0));
74-
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
74+
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
7575

7676
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
7777
Tokenizer tokenizer = new Tokenizer()
@@ -112,11 +112,11 @@ public static void main(String[] args) {
112112
new Document(5L, "l m n"),
113113
new Document(6L, "mapreduce spark"),
114114
new Document(7L, "apache hadoop"));
115-
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
115+
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
116116

117117
// Make predictions on test documents. cvModel uses the best model found (lrModel).
118-
cvModel.transform(test).registerAsTable("prediction");
119-
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
118+
cvModel.transform(test).registerTempTable("prediction");
119+
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
120120
for (Row r: predictions.collect()) {
121121
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
122122
+ ", prediction=" + r.get(3));

examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.spark.ml.classification.LogisticRegression;
2929
import org.apache.spark.mllib.linalg.Vectors;
3030
import org.apache.spark.mllib.regression.LabeledPoint;
31-
import org.apache.spark.sql.SchemaRDD;
31+
import org.apache.spark.sql.DataFrame;
3232
import org.apache.spark.sql.SQLContext;
3333
import org.apache.spark.sql.Row;
3434

@@ -54,7 +54,7 @@ public static void main(String[] args) {
5454
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
5555
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
5656
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
57-
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
57+
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
5858

5959
// Create a LogisticRegression instance. This instance is an Estimator.
6060
LogisticRegression lr = new LogisticRegression();
@@ -94,14 +94,14 @@ public static void main(String[] args) {
9494
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
9595
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
9696
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
97-
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
97+
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
9898

9999
// Make predictions on test documents using the Transformer.transform() method.
100100
// LogisticRegression.transform will only use the 'features' column.
101101
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
102102
// column since we renamed the lr.scoreCol parameter previously.
103-
model2.transform(test).registerAsTable("results");
104-
SchemaRDD results =
103+
model2.transform(test).registerTempTable("results");
104+
DataFrame results =
105105
jsql.sql("SELECT features, label, probability, prediction FROM results");
106106
for (Row r: results.collect()) {
107107
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)

examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.spark.ml.classification.LogisticRegression;
3030
import org.apache.spark.ml.feature.HashingTF;
3131
import org.apache.spark.ml.feature.Tokenizer;
32-
import org.apache.spark.sql.SchemaRDD;
32+
import org.apache.spark.sql.DataFrame;
3333
import org.apache.spark.sql.SQLContext;
3434
import org.apache.spark.sql.Row;
3535

@@ -54,7 +54,7 @@ public static void main(String[] args) {
5454
new LabeledDocument(1L, "b d", 0.0),
5555
new LabeledDocument(2L, "spark f g h", 1.0),
5656
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
57-
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
57+
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
5858

5959
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
6060
Tokenizer tokenizer = new Tokenizer()
@@ -79,11 +79,11 @@ public static void main(String[] args) {
7979
new Document(5L, "l m n"),
8080
new Document(6L, "mapreduce spark"),
8181
new Document(7L, "apache hadoop"));
82-
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
82+
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
8383

8484
// Make predictions on test documents.
85-
model.transform(test).registerAsTable("prediction");
86-
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
85+
model.transform(test).registerTempTable("prediction");
86+
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
8787
for (Row r: predictions.collect()) {
8888
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
8989
+ ", prediction=" + r.get(3));

examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import org.apache.spark.api.java.JavaSparkContext;
2727
import org.apache.spark.api.java.function.Function;
2828

29-
import org.apache.spark.sql.SQLContext;
30-
import org.apache.spark.sql.SchemaRDD;
29+
import org.apache.spark.sql.DataFrame;
3130
import org.apache.spark.sql.Row;
31+
import org.apache.spark.sql.SQLContext;
3232

3333
public class JavaSparkSQL {
3434
public static class Person implements Serializable {
@@ -74,11 +74,11 @@ public Person call(String line) {
7474
});
7575

7676
// Apply a schema to an RDD of Java Beans and register it as a table.
77-
SchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
77+
DataFrame schemaPeople = sqlCtx.applySchema(people, Person.class);
7878
schemaPeople.registerTempTable("people");
7979

8080
// SQL can be run over RDDs that have been registered as tables.
81-
SchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
81+
DataFrame teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
8282

8383
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
8484
// The columns of a row in the result can be accessed by ordinal.
@@ -99,11 +99,11 @@ public String call(Row row) {
9999
// Read in the parquet file created above.
100100
// Parquet files are self-describing so the schema is preserved.
101101
// The result of loading a parquet file is also a JavaSchemaRDD.
102-
SchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
102+
DataFrame parquetFile = sqlCtx.parquetFile("people.parquet");
103103

104104
//Parquet files can also be registered as tables and then used in SQL statements.
105105
parquetFile.registerTempTable("parquetFile");
106-
SchemaRDD teenagers2 =
106+
DataFrame teenagers2 =
107107
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
108108
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
109109
@Override
@@ -120,7 +120,7 @@ public String call(Row row) {
120120
// The path can be either a single text file or a directory storing text files.
121121
String path = "examples/src/main/resources/people.json";
122122
// Create a JavaSchemaRDD from the file(s) pointed by path
123-
SchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
123+
DataFrame peopleFromJsonFile = sqlCtx.jsonFile(path);
124124

125125
// Because the schema of a JSON dataset is automatically inferred, to write queries,
126126
// it is better to take a look at what is the schema.
@@ -134,7 +134,7 @@ public String call(Row row) {
134134
peopleFromJsonFile.registerTempTable("people");
135135

136136
// SQL statements can be run by using the sql methods provided by sqlCtx.
137-
SchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
137+
DataFrame teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
138138

139139
// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
140140
// The columns of a row in the result can be accessed by ordinal.
@@ -151,7 +151,7 @@ public String call(Row row) {
151151
List<String> jsonData = Arrays.asList(
152152
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
153153
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
154-
SchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
154+
DataFrame peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
155155

156156
// Take a look at the schema of this new JavaSchemaRDD.
157157
peopleFromJsonRDD.printSchema();
@@ -164,7 +164,7 @@ public String call(Row row) {
164164

165165
peopleFromJsonRDD.registerTempTable("people2");
166166

167-
SchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
167+
DataFrame peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
168168
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
169169
@Override
170170
public String call(Row row) {

examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.examples.ml
1919

2020
import org.apache.spark.{SparkConf, SparkContext}
21-
import org.apache.spark.SparkContext._
2221
import org.apache.spark.ml.Pipeline
2322
import org.apache.spark.ml.classification.LogisticRegression
2423
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
@@ -101,7 +100,7 @@ object CrossValidatorExample {
101100

102101
// Make predictions on test documents. cvModel uses the best model found (lrModel).
103102
cvModel.transform(test)
104-
.select('id, 'text, 'score, 'prediction)
103+
.select("id", "text", "score", "prediction")
105104
.collect()
106105
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
107106
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)

examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.examples.ml
1919

2020
import org.apache.spark.{SparkConf, SparkContext}
21-
import org.apache.spark.SparkContext._
2221
import org.apache.spark.ml.classification.LogisticRegression
2322
import org.apache.spark.ml.param.ParamMap
2423
import org.apache.spark.mllib.linalg.{Vector, Vectors}
@@ -92,7 +91,7 @@ object SimpleParamsExample {
9291
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
9392
// column since we renamed the lr.scoreCol parameter previously.
9493
model2.transform(test)
95-
.select('features, 'label, 'probability, 'prediction)
94+
.select("features", "label", "probability", "prediction")
9695
.collect()
9796
.foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) =>
9897
println("(" + features + ", " + label + ") -> prob=" + prob + ", prediction=" + prediction)

examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.examples.ml
2020
import scala.beans.BeanInfo
2121

2222
import org.apache.spark.{SparkConf, SparkContext}
23-
import org.apache.spark.SparkContext._
2423
import org.apache.spark.ml.Pipeline
2524
import org.apache.spark.ml.classification.LogisticRegression
2625
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
@@ -80,7 +79,7 @@ object SimpleTextClassificationPipeline {
8079

8180
// Make predictions on test documents.
8281
model.transform(test)
83-
.select('id, 'text, 'score, 'prediction)
82+
.select("id", "text", "score", "prediction")
8483
.collect()
8584
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
8685
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)

examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ import org.apache.spark.mllib.regression.LabeledPoint
2828
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
2929
import org.apache.spark.mllib.util.MLUtils
3030
import org.apache.spark.rdd.RDD
31-
import org.apache.spark.sql.{Row, SQLContext, SchemaRDD}
31+
import org.apache.spark.sql.{Row, SQLContext, DataFrame}
3232

3333
/**
34-
* An example of how to use [[org.apache.spark.sql.SchemaRDD]] as a Dataset for ML. Run with
34+
* An example of how to use [[org.apache.spark.sql.DataFrame]] as a Dataset for ML. Run with
3535
* {{{
3636
* ./bin/run-example org.apache.spark.examples.mllib.DatasetExample [options]
3737
* }}}
@@ -81,18 +81,18 @@ object DatasetExample {
8181
println(s"Loaded ${origData.count()} instances from file: ${params.input}")
8282

8383
// Convert input data to SchemaRDD explicitly.
84-
val schemaRDD: SchemaRDD = origData
84+
val schemaRDD: DataFrame = origData
8585
println(s"Inferred schema:\n${schemaRDD.schema.prettyJson}")
8686
println(s"Converted to SchemaRDD with ${schemaRDD.count()} records")
8787

8888
// Select columns, using implicit conversion to SchemaRDD.
89-
val labelsSchemaRDD: SchemaRDD = origData.select('label)
89+
val labelsSchemaRDD: DataFrame = origData.select("label")
9090
val labels: RDD[Double] = labelsSchemaRDD.map { case Row(v: Double) => v }
9191
val numLabels = labels.count()
9292
val meanLabel = labels.fold(0.0)(_ + _) / numLabels
9393
println(s"Selected label column with average value $meanLabel")
9494

95-
val featuresSchemaRDD: SchemaRDD = origData.select('features)
95+
val featuresSchemaRDD: DataFrame = origData.select("features")
9696
val features: RDD[Vector] = featuresSchemaRDD.map { case Row(v: Vector) => v }
9797
val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
9898
(summary, feat) => summary.add(feat),
@@ -109,7 +109,7 @@ object DatasetExample {
109109
val newDataset = sqlContext.parquetFile(outputDir)
110110

111111
println(s"Schema from Parquet: ${newDataset.schema.prettyJson}")
112-
val newFeatures = newDataset.select('features).map { case Row(v: Vector) => v }
112+
val newFeatures = newDataset.select("features").map { case Row(v: Vector) => v }
113113
val newFeaturesSummary = newFeatures.aggregate(new MultivariateOnlineSummarizer())(
114114
(summary, feat) => summary.add(feat),
115115
(sum1, sum2) => sum1.merge(sum2))

examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.examples.sql
1919

2020
import org.apache.spark.{SparkConf, SparkContext}
2121
import org.apache.spark.sql.SQLContext
22+
import org.apache.spark.sql.dsl._
23+
import org.apache.spark.sql.dsl.literals._
2224

2325
// One method for defining the schema of an RDD is to make a case class with the desired column
2426
// names and types.
@@ -54,7 +56,7 @@ object RDDRelation {
5456
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
5557

5658
// Queries can also be written using a LINQ-like Scala DSL.
57-
rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println)
59+
rdd.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
5860

5961
// Write out an RDD as a parquet file.
6062
rdd.saveAsParquetFile("pair.parquet")
@@ -63,7 +65,7 @@ object RDDRelation {
6365
val parquetFile = sqlContext.parquetFile("pair.parquet")
6466

6567
// Queries can be run using the DSL on parequet files just like the original RDD.
66-
parquetFile.where('key === 1).select('value as 'a).collect().foreach(println)
68+
parquetFile.where($"key" === 1).select($"value".as("a")).collect().foreach(println)
6769

6870
// These files can also be registered as tables.
6971
parquetFile.registerTempTable("parquetFile")

mllib/src/main/scala/org/apache/spark/ml/Estimator.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.annotation.varargs
2121

2222
import org.apache.spark.annotation.AlphaComponent
2323
import org.apache.spark.ml.param.{ParamMap, ParamPair, Params}
24-
import org.apache.spark.sql.SchemaRDD
24+
import org.apache.spark.sql.DataFrame
2525

2626
/**
2727
* :: AlphaComponent ::
@@ -38,7 +38,7 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
3838
* @return fitted model
3939
*/
4040
@varargs
41-
def fit(dataset: SchemaRDD, paramPairs: ParamPair[_]*): M = {
41+
def fit(dataset: DataFrame, paramPairs: ParamPair[_]*): M = {
4242
val map = new ParamMap().put(paramPairs: _*)
4343
fit(dataset, map)
4444
}
@@ -50,7 +50,7 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
5050
* @param paramMap parameter map
5151
* @return fitted model
5252
*/
53-
def fit(dataset: SchemaRDD, paramMap: ParamMap): M
53+
def fit(dataset: DataFrame, paramMap: ParamMap): M
5454

5555
/**
5656
* Fits multiple models to the input data with multiple sets of parameters.
@@ -61,7 +61,7 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
6161
* @param paramMaps an array of parameter maps
6262
* @return fitted models, matching the input parameter maps
6363
*/
64-
def fit(dataset: SchemaRDD, paramMaps: Array[ParamMap]): Seq[M] = {
64+
def fit(dataset: DataFrame, paramMaps: Array[ParamMap]): Seq[M] = {
6565
paramMaps.map(fit(dataset, _))
6666
}
6767
}

0 commit comments

Comments
 (0)