Skip to content

Commit ec5931e

Browse files
updated made to project4
1 parent 61389f7 commit ec5931e

File tree

8 files changed

+85
-85
lines changed

8 files changed

+85
-85
lines changed

project4/src/main/java/com/jobreadyprogrammer/mappers/HouseMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,4 @@ public House call(Row value) throws Exception {
3636

3737
}
3838

39-
}
39+
}

project4/src/main/java/com/jobreadyprogrammer/mappers/LineMapper.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@
66
import org.apache.spark.api.java.function.FlatMapFunction;
77
import org.apache.spark.sql.Row;
88

9-
public class LineMapper implements FlatMapFunction<Row, String> {
10-
9+
public class LineMapper implements FlatMapFunction<Row, String>{
10+
11+
/**
12+
*
13+
*/
1114
private static final long serialVersionUID = 1L;
12-
13-
@Override
14-
public Iterator<String> call(Row value) throws Exception {
1515

16-
return Arrays.asList(value.toString().split(" ")).iterator();
17-
16+
@Override
17+
public Iterator<String> call(Row row) throws Exception {
18+
return Arrays.asList(row.toString().split(" ")).iterator();
1819
}
19-
20-
}
20+
21+
}

project4/src/main/java/com/jobreadyprogrammer/pojos/House.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package com.jobreadyprogrammer.pojos;
22

3+
import java.io.Serializable;
34
import java.util.Date;
45

5-
public class House {
6+
public class House implements Serializable {
7+
8+
/**
9+
*
10+
*/
11+
private static final long serialVersionUID = 1L;
612

713
private int id;
814
private String address;
@@ -50,9 +56,5 @@ public Date getVacantBy() {
5056
public void setVacantBy(Date vacantBy) {
5157
this.vacantBy = vacantBy;
5258
}
53-
54-
55-
56-
5759

58-
}
60+
}

project4/src/main/java/com/jobreadyprogrammer/pojos/Line.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

project4/src/main/java/com/jobreadyprogrammer/spark/Application.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ public static void main(String[] args) {
77
// ArrayToDataset app = new ArrayToDataset();
88
// app.start();
99

10-
// CsvToDatasetHouseToDataframe app = new CsvToDatasetHouseToDataframe();
11-
// app.start();
10+
CsvToDatasetHouseToDataframe app = new CsvToDatasetHouseToDataframe();
11+
app.start();
1212

13-
WordCount wc = new WordCount();
14-
wc.start();
13+
// WordCount wc = new WordCount();
14+
// wc.start();
15+
1516
}
1617

1718

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package com.jobreadyprogrammer.spark;
22

3+
import java.io.Serializable;
34
import java.util.Arrays;
45
import java.util.List;
56

6-
import org.apache.spark.sql.Encoders;
7-
import org.apache.spark.sql.Row;
7+
import org.apache.spark.api.java.function.MapFunction;
8+
import org.apache.spark.api.java.function.ReduceFunction;
89
import org.apache.spark.sql.Dataset;
10+
import org.apache.spark.sql.Encoders;
911
import org.apache.spark.sql.SparkSession;
1012

1113
public class ArrayToDataset {
@@ -16,14 +18,34 @@ public void start() {
1618
.master("local")
1719
.getOrCreate();
1820

19-
String [] stringList = new String[] {"Banana", "Glass", "Computer", "Car"};
21+
String [] stringList = new String[] {"Banana", "Car", "Glass", "Banana", "Computer", "Car"};
2022

2123
List<String> data = Arrays.asList(stringList);
2224

23-
Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
24-
25-
Dataset<Row> df = ds.toDF();
25+
Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
26+
27+
ds = ds.map((MapFunction<String, String>) row -> "word: " + row, Encoders.STRING());
28+
ds.show(10);
29+
30+
String stringValue = ds.reduce(new StringReducer());
31+
32+
System.out.println(stringValue);
2633

2734
}
2835

36+
37+
static class StringReducer implements ReduceFunction<String>, Serializable {
38+
39+
/**
40+
*
41+
*/
42+
private static final long serialVersionUID = 1L;
43+
44+
@Override
45+
public String call(String v1, String v2) throws Exception {
46+
return v1 + v2;
47+
}
48+
49+
}
50+
2951
}

project4/src/main/java/com/jobreadyprogrammer/spark/CsvToDatasetHouseToDataframe.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import org.apache.spark.sql.Dataset;
44
import org.apache.spark.sql.Encoders;
55
import org.apache.spark.sql.Row;
6+
import static org.apache.spark.sql.functions.*;
67
import org.apache.spark.sql.SparkSession;
78

89
import com.jobreadyprogrammer.mappers.HouseMapper;
910
import com.jobreadyprogrammer.pojos.House;
1011

12+
1113
public class CsvToDatasetHouseToDataframe {
1214

1315
public void start() {
@@ -25,28 +27,21 @@ public void start() {
2527
.option("header", true)
2628
.option("sep", ";")
2729
.load(filename);
28-
29-
System.out.println("*** Houses ingested in a dataframe");
3030

31-
df.show(5);
32-
df.printSchema();
33-
34-
35-
Dataset<House> houseDS = df.map(
36-
new HouseMapper(), Encoders.bean(House.class));
31+
System.out.println("House ingested in a dataframe: ");
32+
// df.show(5);
33+
// df.printSchema();
34+
35+
Dataset<House> houseDS = df.map(new HouseMapper(), Encoders.bean(House.class));
3736

38-
System.out.println("below schema is for house dataset");
39-
houseDS.show(5, 17);
37+
System.out.println("*****House ingested in a dataset: *****");
38+
39+
houseDS.show(5);
4040
houseDS.printSchema();
4141

42-
System.out.println("below is the houses dataframe");
43-
4442
Dataset<Row> df2 = houseDS.toDF();
45-
46-
// df2.printSchema();
47-
// df2.show(5, 17);
48-
49-
43+
df2 = df2.withColumn("formatedDate", concat(df2.col("vacantBy.date"), lit("_"), df2.col("vacantBy.year")));
44+
df2.show(10);
5045
}
5146

5247

project4/src/main/java/com/jobreadyprogrammer/spark/WordCount.java

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,20 @@
77

88
import com.jobreadyprogrammer.mappers.LineMapper;
99

10-
import breeze.linalg.Options.Value;
11-
1210
public class WordCount {
1311

1412
public void start() {
1513

14+
String boringWords = " ('a', 'an', 'and', 'are', 'as', 'at', 'be', 'but', 'by',\r\n" +
15+
"'for', 'if', 'in', 'into', 'is', 'it',\r\n" +
16+
"'no', 'not', 'of', 'on', 'or', 'such',\r\n" +
17+
"'that', 'the', 'their', 'then', 'there', 'these',\r\n" +
18+
"'they', 'this', 'to', 'was', 'will', 'with', 'he', 'she'," +
19+
"'your', 'you', 'I', "
20+
+ " 'i','[',']', '[]', 'his', 'him', 'our', 'we') ";
21+
1622
SparkSession spark = SparkSession.builder()
17-
.appName("CSV to dataframe to Dataset<House> and back")
23+
.appName("unstructured text to flatmap")
1824
.master("local")
1925
.getOrCreate();
2026

@@ -23,31 +29,20 @@ public void start() {
2329
Dataset<Row> df = spark.read().format("text")
2430
.load(filename);
2531

26-
df.show(5);
27-
df.printSchema();
28-
29-
Dataset<String> lineDS = df.flatMap(
30-
new LineMapper(), Encoders.STRING());
32+
// df.printSchema();
33+
// df.show(10);
34+
35+
Dataset<String> wordsDS = df.flatMap(new LineMapper(), Encoders.STRING());
36+
37+
Dataset<Row> df2 = wordsDS.toDF();
38+
39+
df2 = df2.groupBy("value").count();
40+
df2 = df2.orderBy(df2.col("count").desc());
41+
df2 = df2.filter("lower(value) NOT IN " + boringWords);
42+
43+
df2.show(500);
3144

32-
33-
lineDS.printSchema();
34-
lineDS.show(10, 200);
35-
36-
String boringWords = "( 'a', 'an', 'and', 'are', 'as', 'at', 'be', 'but', 'by',\r\n" +
37-
" 'for', 'if', 'in', 'into', 'is', 'it',\r\n" +
38-
" 'no', 'not', 'of', 'on', 'or', 'such',\r\n" +
39-
" 'that', 'the', 'their', 'then', 'there', 'these',\r\n" +
40-
" 'they', 'this', 'to', 'was', 'will', 'with', 'he', 'she', 'your', 'you', 'I', "
41-
+ " 'i','[',']', '[]', 'his', 'him', 'our', 'we')";
42-
43-
Dataset<Row> df2 = lineDS.toDF();
44-
df2 = df2.groupBy("value").count();
45-
df2 = df2.filter("lower(value) NOT IN" + boringWords);
46-
df2 = df2.orderBy(df2.col("count").desc());
47-
4845

49-
df2.printSchema();
50-
df2.show(100);
5146
}
5247

5348

0 commit comments

Comments
 (0)