Skip to content

Commit e8f941d

Browse files
Spark map & mapPartitions example
1 parent 4383b89 commit e8f941d

File tree

4 files changed

+107
-2
lines changed

4 files changed

+107
-2
lines changed

src/main/scala/com/sparkbyexamples/spark/dataframe/FlattenNestedStruct.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ object FlattenNestedStruct extends App {
4343
val df2 = df.select(col("name.*"),
4444
col("address.current.*"),
4545
col("address.previous.*"))
46-
df2.toDF("fname","mename","lname","currAddState",
46+
47+
val df2Flatten = df2.toDF("fname","mename","lname","currAddState",
4748
"currAddCity","prevAddState","prevAddCity")
48-
.show(false)
49+
df2Flatten.printSchema()
50+
df2Flatten.show(false)
4951

5052

5153

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.sparkbyexamples.spark.dataframe.examples
2+
3+
import org.apache.spark.sql.{Row, SparkSession}
4+
import org.apache.spark.sql.types._
5+
6+
object DataFrameComplex extends App {
7+
8+
9+
val spark:SparkSession = SparkSession.builder()
10+
.master("local[5]")
11+
.appName("SparkByExamples.com")
12+
.getOrCreate()
13+
14+
val structureData = Seq(
15+
Row(Row("James","","Smith"),"36636","NewYork",3100, List("Java","Scala"),Map("hair"->"black","eye"->"brown")),
16+
Row(Row("Michael","Rose",""),"40288","California",4300,List("Python","PHP"),Map("hair"->"black","eye"->"brown")),
17+
Row(Row("Robert","","Williams"),"42114","Florida",1400,List("C++","C#"),Map("hair"->"black","eye"->"brown")),
18+
Row(Row("Maria","Anne","Jones"),"39192","Florida",5500,List("Python","Scala"),Map("hair"->"black","eye"->"brown")),
19+
Row(Row("Jen","Mary","Brown"),"34561","NewYork",3000,List("R","Scala"),Map("hair"->"black","eye"->"brown"))
20+
)
21+
22+
val structureSchema = new StructType()
23+
.add("name",new StructType()
24+
.add("firstname",StringType)
25+
.add("middlename",StringType)
26+
.add("lastname",StringType))
27+
.add("id",StringType)
28+
.add("location",StringType)
29+
.add("salary",IntegerType)
30+
.add("languagesKnown",ArrayType(StringType))
31+
.add("properties",MapType(StringType,StringType))
32+
33+
34+
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(structureData),structureSchema)
35+
df2.printSchema()
36+
df2.show(false)
37+
38+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.sparkbyexamples.spark.dataframe.examples
2+
3+
import org.apache.spark.sql.{Row, SparkSession}
4+
import org.apache.spark.sql.types.{IntegerType, StringType, StructType,ArrayType,MapType}
5+
6+
object MapTransformation extends App{
7+
8+
val spark:SparkSession = SparkSession.builder()
9+
.master("local[5]")
10+
.appName("SparkByExamples.com")
11+
.getOrCreate()
12+
13+
val structureData = Seq(
14+
Row("James","","Smith","36636","NewYork",3100),
15+
Row("Michael","Rose","","40288","California",4300),
16+
Row("Robert","","Williams","42114","Florida",1400),
17+
Row("Maria","Anne","Jones","39192","Florida",5500),
18+
Row("Jen","Mary","Brown","34561","NewYork",3000)
19+
)
20+
21+
val structureSchema = new StructType()
22+
.add("firstname",StringType)
23+
.add("middlename",StringType)
24+
.add("lastname",StringType)
25+
.add("id",StringType)
26+
.add("location",StringType)
27+
.add("salary",IntegerType)
28+
29+
val df2 = spark.createDataFrame(
30+
spark.sparkContext.parallelize(structureData),structureSchema)
31+
df2.printSchema()
32+
df2.show(false)
33+
34+
import spark.implicits._
35+
val util = new Util()
36+
val df3 = df2.map(row=>{
37+
38+
val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))
39+
(fullName, row.getString(3),row.getInt(5))
40+
})
41+
val df3Map = df3.toDF("fullName","id","salary")
42+
43+
df3Map.printSchema()
44+
df3Map.show(false)
45+
46+
val df4 = df2.mapPartitions(iterator => {
47+
val util = new Util()
48+
val res = iterator.map(row=>{
49+
val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))
50+
(fullName, row.getString(3),row.getInt(5))
51+
})
52+
res
53+
})
54+
val df4part = df4.toDF("fullName","id","salary")
55+
df4part.printSchema()
56+
df4part.show(false)
57+
58+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.sparkbyexamples.spark.dataframe.examples
2+
3+
class Util extends Serializable {
4+
def combine(fname:String,mname:String,lname:String):String = {
5+
fname+","+mname+","+lname
6+
}
7+
}

0 commit comments

Comments
 (0)