Skip to content

Commit 334ba4d

Browse files
Spark collect column
1 parent 52548bd commit 334ba4d

File tree

1 file changed

+52
-0
lines changed

1 file changed

+52
-0
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.sparkbyexamples.spark.dataframe.examples
2+
3+
import org.apache.spark.sql.{Row, SparkSession}
4+
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
5+
6+
object CollectExample extends App {
7+
8+
val spark:SparkSession = SparkSession.builder()
9+
.master("local[1]")
10+
.appName("SparkByExamples.com")
11+
.getOrCreate()
12+
13+
val data = Seq(Row(Row("James ","","Smith"),"36636","M",3000),
14+
Row(Row("Michael ","Rose",""),"40288","M",4000),
15+
Row(Row("Robert ","","Williams"),"42114","M",4000),
16+
Row(Row("Maria ","Anne","Jones"),"39192","F",4000),
17+
Row(Row("Jen","Mary","Brown"),"","F",-1)
18+
)
19+
20+
val schema = new StructType()
21+
.add("name",new StructType()
22+
.add("firstname",StringType)
23+
.add("middlename",StringType)
24+
.add("lastname",StringType))
25+
.add("id",StringType)
26+
.add("gender",StringType)
27+
.add("salary",IntegerType)
28+
29+
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
30+
df.printSchema()
31+
df.show(false)
32+
33+
val colData = df.collect()
34+
35+
colData.foreach(row=>
36+
{
37+
val salary = row.getInt(3)//Index starts from zero
38+
println(salary)
39+
})
40+
41+
//Retrieving data from Struct column
42+
colData.foreach(row=>
43+
{
44+
val salary = row.getInt(3)
45+
val fullName:Row = row.getStruct(0) //Index starts from zero
46+
val firstName = fullName.getString(0)//In struct row, again index starts from zero
47+
val middleName = fullName.get(1).toString
48+
val lastName = fullName.getAs[String]("lastname")
49+
println(firstName+","+middleName+","+lastName+","+salary)
50+
})
51+
52+
}

0 commit comments

Comments
 (0)