Skip to content

Commit 1d5bff6

Browse files
init commit of project 9
Spark Java Machine Learning examples
1 parent d36388e commit 1d5bff6

File tree

9 files changed

+339
-0
lines changed

9 files changed

+339
-0
lines changed

project9/pom.xml

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<groupId>com.jobreadyprogrammer</groupId>
4+
<artifactId>project9</artifactId>
5+
<version>0.0.1-SNAPSHOT</version>
6+
<packaging>jar</packaging>
7+
8+
9+
<properties>
10+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
12+
<java.version>1.8</java.version>
13+
<scala.version>2.11</scala.version>
14+
<spark.version>2.3.0</spark.version>
15+
<postgresql.version>42.1.4</postgresql.version>
16+
</properties>
17+
18+
19+
<dependencies>
20+
21+
<dependency>
22+
<groupId>org.apache.spark</groupId>
23+
<artifactId>spark-core_${scala.version}</artifactId>
24+
<version>${spark.version}</version>
25+
</dependency>
26+
27+
28+
<dependency>
29+
<groupId>org.apache.spark</groupId>
30+
<artifactId>spark-sql_${scala.version}</artifactId>
31+
<version>${spark.version}</version>
32+
</dependency>
33+
34+
<dependency>
35+
<groupId>org.apache.hadoop</groupId>
36+
<artifactId>hadoop-hdfs</artifactId>
37+
<version>2.2.0</version>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.apache.spark</groupId>
42+
<artifactId>spark-mllib_${scala.version}</artifactId>
43+
<version>${spark.version}</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.spark</groupId>
48+
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
49+
<version>2.3.0</version>
50+
</dependency>
51+
52+
53+
54+
</dependencies>
55+
56+
<build>
57+
58+
<plugins>
59+
<plugin>
60+
<groupId>org.apache.maven.plugins</groupId>
61+
<artifactId>maven-dependency-plugin</artifactId>
62+
<executions>
63+
<execution>
64+
<id>copy-dependencies</id>
65+
<phase>prepare-package</phase>
66+
<goals>
67+
<goal>copy-dependencies</goal>
68+
</goals>
69+
<configuration>
70+
<outputDirectory>
71+
${project.build.directory}/libs
72+
</outputDirectory>
73+
</configuration>
74+
</execution>
75+
</executions>
76+
</plugin>
77+
78+
79+
<plugin>
80+
<groupId>org.springframework.boot</groupId>
81+
<artifactId>spring-boot-maven-plugin</artifactId>
82+
<executions>
83+
<execution>
84+
<goals>
85+
<goal>repackage</goal>
86+
</goals>
87+
<configuration>
88+
89+
<mainClass>com.jobreadyprogrammer.spark.Application</mainClass>
90+
91+
</configuration>
92+
</execution>
93+
</executions>
94+
</plugin>
95+
96+
97+
</plugins>
98+
99+
100+
</build>
101+
102+
103+
</project>
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.jobreadyprogrammer.spark;
2+
3+
import org.apache.log4j.Level;
4+
import org.apache.log4j.Logger;
5+
import org.apache.spark.ml.clustering.KMeans;
6+
import org.apache.spark.ml.clustering.KMeansModel;
7+
import org.apache.spark.ml.feature.VectorAssembler;
8+
import org.apache.spark.sql.Dataset;
9+
import org.apache.spark.sql.Row;
10+
import org.apache.spark.sql.SparkSession;
11+
12+
public class KmeansClustering {
13+
14+
public static void main(String[] args) {
15+
16+
Logger.getLogger("org").setLevel(Level.ERROR);
17+
Logger.getLogger("akka").setLevel(Level.ERROR);
18+
19+
SparkSession spark = new SparkSession.Builder()
20+
.appName("kmeans Clustering")
21+
.master("local")
22+
.getOrCreate();
23+
24+
Dataset<Row> wholeSaleDf = spark.read()
25+
.option("header", "true")
26+
.option("inferSchema", "true")
27+
.format("csv")
28+
.load("/Users/imtiazahmad/Desktop/SparkCourse/data/Wholesale customers data.csv");
29+
wholeSaleDf.show();
30+
Dataset<Row> featuresDf = wholeSaleDf.select("channel", "fresh", "milk", "grocery", "frozen", "detergents_paper", "delicassen");
31+
32+
VectorAssembler assembler = new VectorAssembler();
33+
assembler = assembler.setInputCols(new String[] {"channel", "fresh", "milk", "grocery", "frozen", "detergents_paper", "delicassen"})
34+
.setOutputCol("features");
35+
36+
Dataset<Row> trainingData = assembler.transform(featuresDf).select("features");
37+
38+
KMeans kmeans = new KMeans().setK(10);
39+
40+
KMeansModel model = kmeans.fit(trainingData);
41+
42+
System.out.println(model.computeCost(trainingData));
43+
model.summary().predictions().show();
44+
45+
46+
}
47+
48+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.jobreadyprogrammer.spark;
2+
3+
import org.apache.log4j.Level;
4+
import org.apache.log4j.Logger;
5+
import org.apache.spark.ml.feature.VectorAssembler;
6+
import org.apache.spark.ml.regression.LinearRegression;
7+
import org.apache.spark.ml.regression.LinearRegressionModel;
8+
import org.apache.spark.sql.Dataset;
9+
import org.apache.spark.sql.Row;
10+
import org.apache.spark.sql.SparkSession;
11+
12+
public class LinearMarketingVsSales {
13+
14+
public static void main(String[] args) {
15+
16+
Logger.getLogger("org").setLevel(Level.ERROR);
17+
Logger.getLogger("akka").setLevel(Level.ERROR);
18+
19+
SparkSession spark = new SparkSession.Builder()
20+
.appName("LinearRegressionExample")
21+
.master("local")
22+
.getOrCreate();
23+
24+
Dataset<Row> markVsSalesDf = spark.read()
25+
.option("header", "true")
26+
.option("inferSchema", "true")
27+
.format("csv")
28+
.load("/Users/imtiazahmad/Desktop/SparkCourse/data/marketing_vs_sales.csv");
29+
30+
Dataset<Row> mldf = markVsSalesDf.withColumnRenamed("sales", "label")
31+
.select("label", "marketing_spend","bad_day");
32+
33+
String[] featureColumns = {"marketing_spend", "bad_day"};
34+
35+
VectorAssembler assember = new VectorAssembler()
36+
.setInputCols(featureColumns)
37+
.setOutputCol("features");
38+
39+
Dataset<Row> lblFeaturesDf = assember.transform(mldf).select("label", "features");
40+
lblFeaturesDf = lblFeaturesDf.na().drop();
41+
lblFeaturesDf.show();
42+
43+
// next we need to create a linear regression model object
44+
LinearRegression lr = new LinearRegression();
45+
LinearRegressionModel learningModel = lr.fit(lblFeaturesDf);
46+
47+
learningModel.summary().predictions().show();
48+
49+
System.out.println("R Squared: "+ learningModel.summary().r2());
50+
51+
52+
53+
}
54+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.jobreadyprogrammer.spark;
2+
3+
import org.apache.log4j.Level;
4+
import org.apache.log4j.Logger;
5+
import org.apache.spark.ml.Pipeline;
6+
import org.apache.spark.ml.PipelineModel;
7+
import org.apache.spark.ml.PipelineStage;
8+
import org.apache.spark.ml.feature.VectorAssembler;
9+
import org.apache.spark.ml.regression.LinearRegression;
10+
import org.apache.spark.ml.regression.LinearRegressionModel;
11+
import org.apache.spark.sql.Dataset;
12+
import org.apache.spark.sql.Row;
13+
import org.apache.spark.sql.SparkSession;
14+
15+
public class LinearMpgRegression {
16+
17+
public static void main(String[] args) {
18+
19+
Logger.getLogger("org").setLevel(Level.ERROR);
20+
Logger.getLogger("akka").setLevel(Level.ERROR);
21+
22+
SparkSession spark = new SparkSession.Builder()
23+
.appName("LinearRegressionMpgExample")
24+
.master("local")
25+
.getOrCreate();
26+
27+
Dataset<Row> autoMpgDf = spark.read()
28+
.option("header", "true")
29+
.option("inferSchema", "true")
30+
.format("csv")
31+
.load("/Users/imtiazahmad/Desktop/SparkCourse/data/auto_mpg.csv");
32+
33+
autoMpgDf = autoMpgDf.withColumnRenamed("mpg", "label")
34+
.drop("acceleration")
35+
.drop("modelYear")
36+
.drop("origin")
37+
.drop("carName")
38+
.drop("displacement");
39+
40+
autoMpgDf = autoMpgDf.na().drop();
41+
42+
String[] featureColumns = {"cylinders", "horsePower", "weight"};
43+
44+
VectorAssembler assembler = new VectorAssembler()
45+
.setInputCols(featureColumns)
46+
.setOutputCol("features");
47+
48+
autoMpgDf = assembler.transform(autoMpgDf).select("label", "features");
49+
50+
LinearRegression lr = new LinearRegression();
51+
LinearRegressionModel lrm = lr.fit(autoMpgDf);
52+
53+
Pipeline pl = new Pipeline()
54+
.setStages(new PipelineStage[] {lrm});
55+
56+
Dataset<Row> [] splitData = autoMpgDf.randomSplit(new double[] {0.7, 0.3});
57+
58+
Dataset<Row> trainingData = splitData[0];
59+
Dataset<Row> testData = splitData[1];
60+
61+
PipelineModel model = pl.fit(trainingData);
62+
63+
Dataset<Row> result = model.transform(testData);
64+
result.show();
65+
66+
}
67+
68+
69+
70+
}
71+
72+
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.jobreadyprogrammer.spark;
2+
3+
import org.apache.log4j.Level;
4+
import org.apache.log4j.Logger;
5+
import org.apache.spark.ml.Pipeline;
6+
import org.apache.spark.ml.PipelineModel;
7+
import org.apache.spark.ml.PipelineStage;
8+
import org.apache.spark.ml.classification.LogisticRegression;
9+
import org.apache.spark.ml.feature.StringIndexer;
10+
import org.apache.spark.ml.feature.VectorAssembler;
11+
import org.apache.spark.sql.Dataset;
12+
import org.apache.spark.sql.Row;
13+
import org.apache.spark.sql.SparkSession;
14+
15+
public class LogisticRegressionExample {
16+
17+
public static void main(String[] args) {
18+
19+
Logger.getLogger("org").setLevel(Level.ERROR);
20+
Logger.getLogger("akka").setLevel(Level.ERROR);
21+
22+
SparkSession spark = new SparkSession.Builder()
23+
.appName("LogisticRegressionExample")
24+
.master("local")
25+
.getOrCreate();
26+
27+
Dataset<Row> treatmentDf = spark.read()
28+
.option("header", "true")
29+
.option("inferSchema", "true")
30+
.format("csv")
31+
.load("/Users/imtiazahmad/Desktop/SparkCourse/data/cryotherapy.csv");
32+
33+
Dataset<Row> lblFeatureDf = treatmentDf.withColumnRenamed("Result_of_Treatment", "label")
34+
.select("label", "sex","age","time","number_of_warts","type","area");
35+
36+
lblFeatureDf = lblFeatureDf.na().drop();
37+
38+
StringIndexer genderIndexer = new StringIndexer()
39+
.setInputCol("sex").setOutputCol("sexIndex");
40+
41+
VectorAssembler assembler = new VectorAssembler()
42+
.setInputCols(new String [] {"sexIndex", "age", "time", "number_of_warts", "type", "area"})
43+
.setOutputCol("features");
44+
45+
46+
Dataset<Row> [] splitData = lblFeatureDf.randomSplit(new double[] {.7, .3});
47+
Dataset<Row> trainingDf = splitData[0];
48+
Dataset<Row> testingDf = splitData[1];
49+
50+
LogisticRegression logReg = new LogisticRegression();
51+
52+
Pipeline pl = new Pipeline();
53+
pl.setStages(new PipelineStage [] {genderIndexer, assembler, logReg});
54+
55+
PipelineModel model = pl.fit(trainingDf);
56+
Dataset<Row> results = model.transform(testingDf);
57+
58+
results.show();
59+
60+
}
61+
62+
}
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)