Skip to content

Commit 45b993c

Browse files
spark examples
1 parent efc760a commit 45b993c

File tree

9 files changed

+195
-66
lines changed

9 files changed

+195
-66
lines changed

src/main/resources/address.csv

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Id,Address Line1,City,State,Zipcode
2+
1,9182 Clear Water Rd,Fayetteville,AR,72704
3+
2,9724 E Landon Ln,Kennewick,WA,99338
4+
3,9509 Clay Creek Ln,Fort Worth,TX,76177
5+
4,98016 S Garnsey St,Santa Ana,CA,92707
6+
5,9920 State Highway 89,Ringling,OK,73456

src/main/scala/com/sparkbyexamples/spark/dataframe/CreateDataFrame.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
package com.sparkbyexamples.spark.dataframe
22

33
import org.apache.spark.sql.types.{StringType, StructField, StructType}
4-
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
4+
import org.apache.spark.sql.{Row, SparkSession}
55

66
object CreateDataFrame {
77

88
def main(args:Array[String]):Unit={
99

1010
val spark:SparkSession = SparkSession.builder()
11-
.master("local[1]")
12-
.appName("SparkByExample")
11+
.master("local[1]").appName("SparkByExamples.com")
1312
.getOrCreate()
1413

1514
import spark.implicits._
@@ -20,10 +19,10 @@ object CreateDataFrame {
2019

2120
//From RDD (USING toDF())
2221
val dfFromRDD1 = rdd.toDF("language","users")
23-
22+
dfFromRDD1.printSchema()
2423
//From RDD (USING createDataFrame)
2524
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
26-
25+
dfFromRDD2.printSchema()
2726
//From RDD (USING createDataFrame and Adding schema using StructType)
2827
//convert RDD[T] to RDD[Row]
2928
val schema = StructType( Array(StructField("language", StringType, true),

src/main/scala/com/sparkbyexamples/spark/dataframe/CreateEmptyDatasetExample.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ object CreateEmptyDatasetExample extends App {
1010
.master("local[1]")
1111
.appName("SparkByExamples.com")
1212
.getOrCreate()
13-
13+
spark.sparkContext.setLogLevel("ERROR");
1414
import spark.implicits._
1515

1616
val schema = StructType(
@@ -21,11 +21,24 @@ object CreateEmptyDatasetExample extends App {
2121
val colSeq = Seq("firstName","lastName","middleName")
2222

2323
case class Name(firstName: String, lastName: String, middleName:String)
24+
case class Empty()
25+
val ds0 = spark.emptyDataset[Empty]
26+
ds0.printSchema()
27+
28+
val ds1=spark.emptyDataset[Name]
29+
ds1.printSchema()
30+
31+
val ds2 = spark.createDataset(Seq.empty[Name])
32+
ds2.printSchema()
33+
34+
val ds4=spark.createDataset(spark.sparkContext.emptyRDD[Name])
35+
ds4.printSchema()
36+
37+
val ds3=spark.createDataset(Seq.empty[(String,String,String)])
38+
ds3.printSchema()
39+
val ds5=Seq.empty[(String,String,String)].toDS()
40+
ds5.printSchema()
2441

25-
spark.createDataset(Seq.empty[Name])
26-
spark.createDataset(Seq.empty[(String,String,String)])
27-
spark.createDataset(spark.sparkContext.emptyRDD[Name])
28-
Seq.empty[(String,String,String)].toDS()
29-
Seq.empty[Name].toDS()
30-
spark.emptyDataset[Name]
42+
val ds6=Seq.empty[Name].toDS()
43+
ds6.printSchema()
3144
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.sparkbyexamples.spark.dataframe
2+
3+
import org.apache.spark.sql.{SparkSession}
4+
import org.apache.spark.sql.functions.col
5+
6+
object FilterNullRowsExample extends App{
7+
8+
val spark: SparkSession = SparkSession.builder()
9+
.master("local[1]")
10+
.appName("SparkByExamples.com")
11+
.getOrCreate()
12+
13+
spark.sparkContext.setLogLevel("ERROR")
14+
val data = Seq(
15+
("James",null,"M"),
16+
("Anna","NY","F"),
17+
("Julia",null,null)
18+
)
19+
import spark.implicits._
20+
val columns = Seq("name","state","gender")
21+
val df = data.toDF(columns:_*)
22+
23+
df.printSchema()
24+
df.show()
25+
26+
df.filter("state is NULL").show(false)
27+
df.filter(df("state").isNull).show(false)
28+
df.filter(col("state").isNull).show(false)
29+
30+
df.filter("state is not NULL").show(false)
31+
df.filter("NOT state is NULL").show(false)
32+
df.filter(df("state").isNotNull).show(false)
33+
34+
df.filter("state is NULL AND gender is NULL").show(false)
35+
df.filter(df("state").isNull && df("gender").isNull).show(false)
36+
37+
df.createOrReplaceTempView("DATA")
38+
spark.sql("SELECT * FROM DATA where STATE IS NULL").show(false)
39+
spark.sql("SELECT * FROM DATA where STATE IS NULL AND GENDER IS NULL").show(false)
40+
spark.sql("SELECT * FROM DATA where STATE IS NOT NULL").show(false)
41+
42+
43+
}

src/main/scala/com/sparkbyexamples/spark/dataframe/RemoveNullRowsExample.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,18 @@ object RemoveNullRowsExample extends App{
99
.appName("SparkByExamples.com")
1010
.getOrCreate()
1111

12+
spark.sparkContext.setLogLevel("ERROR")
1213
val filePath="src/main/resources/small_zipcode.csv"
1314

1415
val df = spark.read.options(Map("inferSchema"->"true","delimiter"->",","header"->"true")).csv(filePath)
1516
df.printSchema()
1617
df.show(false)
1718

18-
df.na.drop()
19-
.show(false)
19+
df.na.drop().show(false)
20+
21+
//all/any
22+
df.na.drop("any").show(false)
23+
24+
df.na.drop(Seq("population","type")).show(false)
2025

21-
// Array and map columns
2226
}

src/main/scala/com/sparkbyexamples/spark/dataframe/WithColumn.scala

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.sparkbyexamples.spark.dataframe
22

33
import org.apache.spark.sql.{Row, SparkSession}
4-
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
4+
import org.apache.spark.sql.types.{StringType, StructType}
55
import org.apache.spark.sql.functions._
66
object WithColumn {
77

@@ -12,27 +12,23 @@ object WithColumn {
1212
.appName("SparkByExamples.com")
1313
.getOrCreate()
1414

15-
val arrayStructureData = Seq(
16-
Row(Row("James ","","Smith"),"1","M",3100,List("Cricket","Movies"),Map("hair"->"black","eye"->"brown")),
17-
Row(Row("Michael ","Rose",""),"2","M",3100,List("Tennis"),Map("hair"->"brown","eye"->"black")),
18-
Row(Row("Robert ","","Williams"),"3","M",3100,List("Cooking","Football"),Map("hair"->"red","eye"->"gray")),
19-
Row(Row("Maria ","Anne","Jones"),"4","M",3100,null,Map("hair"->"blond","eye"->"red")),
20-
Row(Row("Jen","Mary","Brown"),"5","M",3100,List("Blogging"),Map("white"->"black","eye"->"black"))
15+
val dataRows = Seq(Row(Row("James;","","Smith"),"36636","M","3000"),
16+
Row(Row("Michael","Rose",""),"40288","M","4000"),
17+
Row(Row("Robert","","Williams"),"42114","M","4000"),
18+
Row(Row("Maria","Anne","Jones"),"39192","F","4000"),
19+
Row(Row("Jen","Mary","Brown"),"","F","-1")
2120
)
2221

23-
val arrayStructureSchema = new StructType()
22+
val schema = new StructType()
2423
.add("name",new StructType()
2524
.add("firstname",StringType)
2625
.add("middlename",StringType)
2726
.add("lastname",StringType))
28-
.add("id",StringType)
27+
.add("dob",StringType)
2928
.add("gender",StringType)
30-
.add("salary",IntegerType)
31-
.add("Hobbies", ArrayType(StringType))
32-
.add("properties", MapType(StringType,StringType))
29+
.add("salary",StringType)
3330

34-
val df2 = spark.createDataFrame(
35-
spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
31+
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(dataRows),schema)
3632

3733
//Change the column data type
3834
df2.withColumn("salary",df2("salary").cast("Integer"))
@@ -62,10 +58,6 @@ object WithColumn {
6258
df2.select("name.firstname").show(false)
6359
df2.select("name.*").show(false)
6460

65-
66-
val df8 = df2.select(col("*"),explode(col("hobbies")))
67-
df8.show(false)
68-
6961
import spark.implicits._
7062

7163
val columns = Seq("name","address")
@@ -81,5 +73,9 @@ object WithColumn {
8173
val finalDF = newDF.toDF("First Name","Last Name","Address Line1","City","State","zipCode")
8274
finalDF.printSchema()
8375
finalDF.show(false)
76+
77+
df2.createOrReplaceTempView("PERSON")
78+
spark.sql("SELECT salary*100 as salary, salary*-1 as CopiedColumn, 'USA' as country FROM PERSON").show()
8479
}
80+
8581
}
Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,33 @@
1-
package com.sparkbyexamples.spark.dataframe.examples
2-
3-
import org.apache.spark.sql.SparkSession
4-
5-
object ForEachPartExample extends App {
6-
7-
val spark: SparkSession = SparkSession.builder()
8-
.master("local[1]")
9-
.appName("SparkByExamples.com")
10-
.getOrCreate()
11-
12-
val data = Seq(("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"),
13-
("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"),
14-
("Carrots",1200,"China"),("Beans",1500,"China"))
15-
16-
// foreachPartition DataFrame
17-
val df = spark.createDataFrame(data).toDF("Product","Amount","Country")
18-
df.foreachPartition(partition => {
19-
//Initialize any database connection
20-
partition.foreach(fun=>{
21-
//apply the function
22-
})
23-
})
24-
25-
//rdd
26-
val rdd = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
27-
rdd.foreachPartition(partition => {
28-
//Initialize any database connection
29-
partition.foreach(fun=>{
30-
//apply the function
31-
})
32-
})
33-
}
1+
//package com.sparkbyexamples.spark.dataframe.examples
2+
//
3+
//import org.apache.spark.sql.SparkSession
4+
//
5+
//object ForEachPartExample extends App {
6+
//
7+
// val spark: SparkSession = SparkSession.builder()
8+
// .master("local[1]")
9+
// .appName("SparkByExamples.com")
10+
// .getOrCreate()
11+
//
12+
// val data = Seq(("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"),
13+
// ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"),
14+
// ("Carrots",1200,"China"),("Beans",1500,"China"))
15+
//
16+
// // foreachPartition DataFrame
17+
// val df = spark.createDataFrame(data).toDF("Product","Amount","Country")
18+
// df.foreachPartition(partition => {
19+
// //Initialize any database connection
20+
// partition.foreach(fun=>{
21+
// //apply the function
22+
// })
23+
// })
24+
//
25+
// //rdd
26+
// val rdd = spark.sparkContext.parallelize(Seq(1,2,3,4,5,6,7,8,9))
27+
// rdd.foreachPartition(partition => {
28+
// //Initialize any database connection
29+
// partition.foreach(fun=>{
30+
// //apply the function
31+
// })
32+
// })
33+
//}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.sparkbyexamples.spark.dataframe.examples
2+
3+
import java.io.File
4+
5+
import org.apache.hadoop.conf.Configuration
6+
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
7+
import org.apache.spark.sql.{SaveMode, SparkSession}
8+
9+
object SaveSingleFile extends App{
10+
11+
val spark:SparkSession = SparkSession.builder()
12+
.master("local[3]")
13+
.appName("SparkByExamples.com")
14+
.getOrCreate()
15+
16+
val df = spark.read.option("header",true).csv("src/main/resources/address.csv")
17+
df.repartition(1).write.mode(SaveMode.Overwrite).csv("/tmp/address")
18+
19+
20+
val hadoopConfig = new Configuration()
21+
val hdfs = FileSystem.get(hadoopConfig)
22+
23+
val srcPath=new Path("/tmp/address")
24+
val destPath= new Path("/tmp/address_merged.csv")
25+
val srcFile=FileUtil.listFiles(new File("c:/tmp/address")).filterNot(f=>f.getPath.endsWith(".csv"))(0)
26+
//Copy the CSV file outside of Directory and rename
27+
FileUtil.copy(srcFile,hdfs,destPath,true,hadoopConfig)
28+
//Remove Directory created by df.write()
29+
hdfs.delete(srcPath,true)
30+
//Removes CRC File
31+
hdfs.delete(new Path("/tmp/.address_merged.csv.crc"),true)
32+
33+
// Merge Using Haddop API
34+
df.repartition(1).write.mode(SaveMode.Overwrite).csv("/tmp/address-tmp")
35+
val srcFilePath=new Path("/tmp/address-tmp")
36+
val destFilePath= new Path("/tmp/address_merged2.csv")
37+
FileUtil.copyMerge(hdfs, srcFilePath, hdfs, destFilePath, true, hadoopConfig, null)
38+
//Remove hidden CRC file if not needed.
39+
hdfs.delete(new Path("/tmp/.address_merged2.csv.crc"),true)
40+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.sparkbyexamples.spark.rdd.functions
2+
3+
import org.apache.spark.sql.SparkSession
4+
5+
object SortByKeyExample extends App{
6+
7+
val spark: SparkSession = SparkSession.builder()
8+
.master("local[1]")
9+
.appName("SparkByExamples.com")
10+
.getOrCreate()
11+
12+
val data = Seq(("Project","A", 1),
13+
("Gutenberg’s", "X",3),
14+
("Alice’s", "C",5),
15+
("Adventures","B", 1)
16+
)
17+
18+
val rdd=spark.sparkContext.parallelize(data)
19+
rdd.foreach(println)
20+
val rdd2=rdd.map(f=>{(f._2, (f._1,f._2,f._3))})
21+
rdd2.foreach(println)
22+
val rdd3= rdd2.sortByKey()
23+
val rdd4= rdd2.sortByKey(false)
24+
rdd4.foreach(println)
25+
26+
val rdd5 = rdd.sortBy(f=>(f._3,f._2),false)
27+
rdd5.foreach(println)
28+
}

0 commit comments

Comments
 (0)