Skip to content

Commit 46dc2c9

Browse files
Spark Join operations
1 parent ee4ba07 commit 46dc2c9

File tree

5 files changed

+192
-3
lines changed

5 files changed

+192
-3
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.sparkbyexamples.spark.dataframe.join
2+
3+
class CrossJoinExample {
4+
5+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.sparkbyexamples.spark.dataframe.join
2+
3+
import org.apache.spark.sql.SparkSession
4+
import org.apache.spark.sql.catalyst.plans.Inner
5+
6+
object InnerJoinExample extends App {
7+
8+
val spark: SparkSession = SparkSession.builder()
9+
.master("local[1]")
10+
.appName("SparkByExamples.com")
11+
.getOrCreate()
12+
13+
spark.sparkContext.setLogLevel("ERROR")
14+
15+
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
16+
(2,"Rose",1,"2010","20","M",4000),
17+
(3,"Williams",1,"2010","10","M",1000),
18+
(4,"Jones",2,"2005","10","F",2000),
19+
(5,"Brown",2,"2010","40","",-1),
20+
(6,"Brown",2,"2010","50","",-1)
21+
)
22+
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
23+
import spark.sqlContext.implicits._
24+
val empDF = emp.toDF(empColumns:_*)
25+
empDF.show(false)
26+
27+
val dept = Seq(("Finance",10),
28+
("Marketing",20),
29+
("Sales",30),
30+
("IT",40)
31+
)
32+
33+
val deptColumns = Seq("dept_name","dept_id")
34+
val deptDF = dept.toDF(deptColumns:_*)
35+
deptDF.show(false)
36+
37+
38+
println("Inner join")
39+
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner")
40+
.show(false)
41+
42+
empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),Inner.sql)
43+
.show(false)
44+
}
Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,60 @@
11
package com.sparkbyexamples.spark.dataframe.join
22

3-
object JoinMultipleColumns {
3+
import org.apache.spark.sql.SparkSession
44

5+
object JoinMultipleColumns extends App {
6+
7+
val spark: SparkSession = SparkSession.builder()
8+
.master("local[1]")
9+
.appName("SparkByExamples.com")
10+
.getOrCreate()
11+
12+
spark.sparkContext.setLogLevel("ERROR")
13+
14+
val emp = Seq((1,"Smith",-1,"2018",10,"M",3000),
15+
(2,"Rose",1,"2010",20,"M",4000),
16+
(3,"Williams",1,"2010",10,"M",1000),
17+
(4,"Jones",2,"2005",10,"F",2000),
18+
(5,"Brown",2,"2010",30,"",-1),
19+
(6,"Brown",2,"2010",50,"",-1)
20+
)
21+
val empColumns = Seq("emp_id","name","superior_emp_id","branch_id","dept_id","gender","salary")
22+
import spark.sqlContext.implicits._
23+
val empDF = emp.toDF(empColumns:_*)
24+
empDF.show(false)
25+
26+
val dept = Seq(("Finance",10,"2018"),
27+
("Marketing",20,"2010"),
28+
("Marketing",20,"2018"),
29+
("Sales",30,"2005"),
30+
("Sales",30,"2010"),
31+
("IT",50,"2010")
32+
)
33+
34+
val deptColumns = Seq("dept_name","dept_id","branch_id")
35+
val deptDF = dept.toDF(deptColumns:_*)
36+
deptDF.show(false)
37+
38+
//Using multiple columns on join expression
39+
empDF.join(deptDF, empDF("dept_id") === deptDF("dept_id") &&
40+
empDF("branch_id") === deptDF("branch_id"),"inner")
41+
.show(false)
42+
43+
//Using Join with multiple columns on where clause
44+
empDF.join(deptDF).where(empDF("dept_id") === deptDF("dept_id") &&
45+
empDF("branch_id") === deptDF("branch_id"))
46+
.show(false)
47+
48+
//Using Join with multiple columns on filter clause
49+
empDF.join(deptDF).filter(empDF("dept_id") === deptDF("dept_id") &&
50+
empDF("branch_id") === deptDF("branch_id"))
51+
.show(false)
52+
53+
//Using SQL & multiple columns on join expression
54+
empDF.createOrReplaceTempView("EMP")
55+
deptDF.createOrReplaceTempView("DEPT")
56+
57+
val resultDF = spark.sql("select e.* from EMP e, DEPT d " +
58+
"where e.dept_id == d.dept_id and e.branch_id == d.branch_id")
59+
resultDF.show(false)
560
}
Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,35 @@
11
package com.sparkbyexamples.spark.dataframe.join
22

3-
object JoinMultipleDataFrames {
3+
import org.apache.spark.sql.SparkSession
44

5+
object JoinMultipleDataFrames extends App {
6+
7+
val spark: SparkSession = SparkSession.builder()
8+
.master("local[1]")
9+
.appName("SparkByExamples.com")
10+
.getOrCreate()
11+
12+
spark.sparkContext.setLogLevel("ERROR")
13+
14+
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
15+
(2,"Rose",1,"2010","20","M",4000),
16+
(3,"Williams",1,"2010","10","M",1000),
17+
(4,"Jones",2,"2005","10","F",2000),
18+
(5,"Brown",2,"2010","40","",-1),
19+
(6,"Brown",2,"2010","50","",-1)
20+
)
21+
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
22+
import spark.sqlContext.implicits._
23+
val empDF = emp.toDF(empColumns:_*)
24+
empDF.show(false)
25+
26+
val dept = Seq(("Finance",10),
27+
("Marketing",20),
28+
("Sales",30),
29+
("IT",40)
30+
)
31+
32+
val deptColumns = Seq("dept_name","dept_id")
33+
val deptDF = dept.toDF(deptColumns:_*)
34+
deptDF.show(false)
535
}
Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,60 @@
11
package com.sparkbyexamples.spark.dataframe.join
22

3-
object SelfJoinExample {
3+
import org.apache.spark.sql.SparkSession
4+
import org.apache.spark.sql.functions.col
5+
6+
object SelfJoinExample extends App {
7+
8+
val spark: SparkSession = SparkSession.builder()
9+
.master("local[1]")
10+
.appName("SparkByExamples.com")
11+
.getOrCreate()
12+
13+
spark.sparkContext.setLogLevel("ERROR")
14+
15+
val emp = Seq((1,"Smith",1,"10",3000),
16+
(2,"Rose",1,"20",4000),
17+
(3,"Williams",1,"10",1000),
18+
(4,"Jones",2,"10",2000),
19+
(5,"Brown",2,"40",-1),
20+
(6,"Brown",2,"50",-1)
21+
)
22+
val empColumns = Seq("emp_id","name","superior_emp_id","emp_dept_id","salary")
23+
import spark.sqlContext.implicits._
24+
val empDF = emp.toDF(empColumns:_*)
25+
empDF.show(false)
26+
27+
println("self join")
28+
val selfDF = empDF.as("emp1").join(empDF.as("emp2"),
29+
col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
30+
selfDF.show(false)
31+
32+
selfDF.select(col("emp1.emp_id"),col("emp1.name"),
33+
col("emp2.emp_id").as("superior_emp_id"),
34+
col("emp2.name").as("superior_emp_name"))
35+
.show(false)
36+
37+
//Spark SQL self join with where clause
38+
empDF.as("emp1").join(empDF.as("emp2")).where(
39+
col("emp1.superior_emp_id") === col("emp2.emp_id"))
40+
.select(col("emp1.emp_id"),col("emp1.name"),
41+
col("emp2.emp_id").as("superior_emp_id"),
42+
col("emp2.name").as("superior_emp_name"))
43+
.show(false)
44+
45+
//Spark SQL self join with filter clause
46+
empDF.as("emp1").join(empDF.as("emp2")).filter(
47+
col("emp1.superior_emp_id") === col("emp2.emp_id"))
48+
.select(col("emp1.emp_id"),col("emp1.name"),
49+
col("emp2.emp_id").as("superior_emp_id"),
50+
col("emp2.name").as("superior_emp_name"))
51+
.show(false)
52+
53+
54+
empDF.createOrReplaceTempView("EMP")
55+
spark.sql("select emp1.emp_id,emp1.name," +
56+
"emp2.emp_id as superior_emp_id, emp2.name as superior_emp_name " +
57+
"from EMP emp1 INNER JOIN EMP emp2 on emp1.superior_emp_id == emp2.emp_id")
58+
.show(false)
459

560
}

0 commit comments

Comments
 (0)