Skip to content

Commit 1b83cee

Browse files
afzals2000@yahoo.comafzals2000@yahoo.com
afzals2000@yahoo.com
authored and
afzals2000@yahoo.com
committed
Initial Commit
0 parents  commit 1b83cee

File tree

9 files changed

+343
-0
lines changed

9 files changed

+343
-0
lines changed

.gitignore

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
.idea/
2+
spark-warehouse/
3+
target/
4+
*.class
5+
*.iml
6+
*.ipr
7+
*.iws
8+
.idea
9+
out
10+
tmp/
11+

README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
##MovieLens
2+
This is a example of how to write Scala Spark/Spark SQL using Test Driven Development.
3+
4+
## Prerequisites
5+
1. JDK 1.8
6+
2. Scala 2.11.8
7+
3. SBT
8+
* Tool for library dependency management. .
9+
4. I've used Intellij IDEA for development and testing. If using Intellij then
10+
* Install following Plugin
11+
* SBT
12+
* Scala
13+
* (See build.sbt for component versions)
14+
15+
####Data Files
16+
Create following "::" delimiter files in /tmp dir
17+
18+
* Movie.dat::
19+
```
20+
21::Toy Story (1995)::Animation|Children's|Comedy
21+
22::Jumanji (1995)::Adventure|Children's|Fantasy
22+
23::Grumpier Old Men (1995)::Comedy|Romance
23+
24::Waiting to Exhale (1995)::Comedy|Drama
24+
25::Father of the Bride Part II (1995)::Comedy
25+
```
26+
27+
* User.dat::
28+
```
29+
1::F::1::10::48067
30+
2::M::56::16::70072
31+
3::M::25::15::55117
32+
4::M::45::7::02460
33+
6::F::50::9::55117
34+
```
35+
36+
* Rating.dat::
37+
```
38+
1::21::5::978300760
39+
1::22::3::978300760
40+
2::22::4::978299026
41+
2::23::5::978299026
42+
2::24::4::978299026
43+
2::25::3::978299026
44+
3::23::2::978297837
45+
4::654321::5::978294008
46+
5::25::3::978245037
47+
```
48+
49+
#### Running Test
50+
* movielens (master)*$ sbt clean package test
51+
52+
#### Acknowledgments
53+
54+
* Not Handled exception like Bad rows.
55+
56+
## Change Log
57+
* 0.0.1
58+
* Initial Commit
59+
60+
## Meta
61+
https://github.com/afzals2000/SparkUsingTDD
62+
63+
## Contributing
64+
1. Fork it (https://github.com/afzals2000/SparkUsingTDD)
65+
2. Create your feature branch (git checkout -b feature/fooBar)
66+
3. Commit your changes (git commit -am 'Add some fooBar')
67+
4. Push to the branch (git push origin feature/fooBar)
68+
5. Create a new Pull Request

build.sbt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
name := "movielens"
2+
3+
version := "1.0"
4+
5+
scalaVersion := "2.11.8"
6+
7+
lazy val spark = "2.0.2"
8+
9+
artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
10+
artifact.name + "." + artifact.extension
11+
}
12+
13+
resolvers ++= Seq(
14+
"apache-snapshots" at "http://repository.apache.org/snapshots/"
15+
)
16+
17+
libraryDependencies ++= Seq(
18+
"org.apache.spark" %% "spark-core" % spark,
19+
"org.apache.spark" %% "spark-sql" % spark,
20+
"org.apache.spark" %% "spark-hive" % spark % "test",
21+
"org.apache.spark" %% "spark-streaming" % spark,
22+
"log4j" % "log4j" % "1.2.14",
23+
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
24+
"com.holdenkarau" %% "spark-testing-base" % "2.0.2_0.4.7"
25+
)

project/build.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version = 1.2.1

project/plugins.sbt

Whitespace-only changes.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.movie
2+
3+
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
4+
import org.apache.spark.sql.types._
5+
import org.apache.spark.sql.types.StructType
6+
7+
object DataLoader {
8+
val userSchema = StructType(
9+
Seq(
10+
StructField(name = "USERID", dataType = IntegerType, nullable = true),
11+
StructField(name = "GENDER", dataType = StringType, nullable = true),
12+
StructField(name = "AGE", dataType = IntegerType, nullable = true),
13+
StructField(name = "OCCUPATION", dataType = IntegerType, nullable = true),
14+
StructField(name = "ZIPCODE", dataType = StringType, nullable = true)
15+
)
16+
)
17+
18+
val movieSchema = StructType(
19+
Seq(
20+
StructField(name = "MOVIEID", dataType = IntegerType, nullable = true),
21+
StructField(name = "TITLE", dataType = StringType, nullable = true),
22+
StructField(name = "GENRES", dataType = StringType, nullable = true)
23+
)
24+
)
25+
26+
val ratingSchema = StructType(
27+
Seq(
28+
StructField(name = "USERID", dataType = IntegerType, nullable = true),
29+
StructField(name = "MOVIEID", dataType = IntegerType, nullable = true),
30+
StructField(name = "RATINGS", dataType = IntegerType, nullable = true),
31+
StructField(name = "TS", dataType = LongType, nullable = true)
32+
)
33+
)
34+
35+
def loadUsers(file: String)(implicit spark: SparkSession): DataFrame = {
36+
def row(line: Array[String]): Row = Row(line(0).toInt, line(1), line(2).toInt, line(3).toInt, line(4))
37+
getDataFrame(file, row, userSchema)
38+
}
39+
40+
def loadMovies(file: String)(implicit spark: SparkSession): DataFrame = {
41+
def row(line: Array[String]): Row = Row(line(0).toInt, line(1), line(2))
42+
getDataFrame(file, row, movieSchema)
43+
}
44+
45+
def loadRatings(file: String)(implicit spark: SparkSession): DataFrame = {
46+
def row(line: Array[String]): Row = Row(line(0).toInt, line(1).toInt, line(2).toInt, line(3).toLong)
47+
getDataFrame(file, row, ratingSchema)
48+
}
49+
50+
51+
def getDataFrame(file: String, row: (Array[String]) => Row, schema: StructType)(implicit spark: SparkSession): DataFrame = {
52+
val rawData = spark.sparkContext.textFile(file)
53+
val rowRDD = rawData.map(line => line.split("::")).map(line => row(line))
54+
spark.createDataFrame(rowRDD, schema)
55+
}
56+
57+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.movie
2+
3+
import org.apache.spark.sql.{SparkSession}
4+
5+
object MovieLensJob {
6+
7+
def main(args: Array[String]): Unit = {
8+
9+
implicit val spark: SparkSession = getSparkSession("MovieJob")
10+
11+
val users = DataLoader.loadUsers("/tmp/users.dat")
12+
val movies = DataLoader.loadMovies("/tmp/movies.dat")
13+
val ratings = DataLoader.loadRatings("/tmp/ratings.dat")
14+
15+
val userMovieDF = MovieLensService.userMovieRatings(ratings)
16+
userMovieDF.show(5)
17+
userMovieDF.coalesce(1).write.option("header", "true").csv("/tmp/user_output")
18+
19+
val movieGenres = MovieLensService.movieCountByGenre(movies)
20+
movieGenres.show(5)
21+
movieGenres.coalesce(1).write.option("header", "true").csv("/tmp/movie_genres")
22+
23+
val top100 = MovieLensService.top100Movie(movies, ratings)
24+
top100.show()
25+
top100.coalesce(1).write.parquet("/tmp/top100")
26+
27+
}
28+
29+
def getSparkSession(appName: String): SparkSession = {
30+
val spark = SparkSession
31+
.builder()
32+
.appName("Movie Ratings")
33+
.config("spark.master", "local")
34+
.getOrCreate()
35+
36+
spark.sparkContext.setLogLevel("ERROR")
37+
spark
38+
}
39+
40+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.movie
2+
3+
import org.apache.spark.sql.expressions.{Window, WindowSpec}
4+
import org.apache.spark.sql.{DataFrame, SparkSession}
5+
import org.apache.spark.sql.functions._
6+
import org.apache.spark.sql.types.IntegerType
7+
8+
object MovieLensService {
9+
10+
def userMovieRatings(ratingDF: DataFrame)(implicit spark: SparkSession): DataFrame = {
11+
val groupedByUser = ratingDF.groupBy(col("USERID")).agg(count("MOVIEID").as("NO_OF_MOVIE"), avg("RATINGS").as("AVG_RATINGS"))
12+
groupedByUser.withColumn("AVG_RATINGS", bround(col("AVG_RATINGS"), 2))
13+
}
14+
15+
def movieCountByGenre(movieDF: DataFrame)(implicit spark: SparkSession): DataFrame = {
16+
val explodedGenres = movieDF.withColumn("GENRES", explode(split(col("GENRES"), "[|]")))
17+
explodedGenres.groupBy(col("GENRES")).agg(countDistinct(col("MOVIEID")).as("TOTAL_MOVIES"))
18+
}
19+
20+
def top100Movie(movieDF: DataFrame, ratingDF: DataFrame)(implicit spark: SparkSession): DataFrame = {
21+
val groupByRatings = ratingDF.groupBy("MOVIEID").agg(avg("RATINGS").cast(IntegerType).as("AVG_RATINGS")) // => get top 100
22+
val renameMovieID = groupByRatings.withColumnRenamed("MOVIEID", "GROUPED_MOVIEID")
23+
val window: WindowSpec = Window.orderBy(col("AVG_RATINGS"), col("MOVIEID"))
24+
25+
val movieAndRatings = renameMovieID.join(movieDF, col("GROUPED_MOVIEID") === col("MOVIEID"))
26+
.select(
27+
col("MOVIEID")
28+
, col("TITLE")
29+
, col("AVG_RATINGS")
30+
).limit(100)
31+
movieAndRatings.select(row_number() over (window) as "RANKING", movieAndRatings.col("*"))
32+
}
33+
34+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package com.movie
2+
3+
import com.holdenkarau.spark.testing.DatasetSuiteBase
4+
import org.apache.spark.sql.{DataFrame, SparkSession}
5+
import org.apache.spark.sql.functions.col
6+
import org.apache.spark.sql.types.{StructField, StructType}
7+
import org.scalatest.{FunSuite}
8+
9+
class MovieLensServiceTest extends FunSuite with DatasetSuiteBase {
10+
11+
lazy implicit val mySpark = spark
12+
import spark.implicits._
13+
14+
test("No of Movie and Avg Rating By Users") {
15+
spark.sparkContext.setLogLevel("ERROR")
16+
lazy val ratingDF: DataFrame =
17+
Seq((1, 21, 5), (1, 22, 4), (1, 23, 5), (2, 22, 4), (2, 23, 3), (3, 23, 2)
18+
).toDF("USERID", "MOVIEID", "RATINGS")
19+
20+
val expected = Seq(
21+
(1, 3l, 4.67),(2, 2l, 3.5),(3, 1l, 2.0)
22+
).toDF("USERID", "NO_OF_MOVIE", "AVG_RATINGS")
23+
24+
val actual = MovieLensService.userMovieRatings(ratingDF)
25+
26+
actual.show()
27+
expected.show()
28+
29+
assertEqualsNullable(actual,expected)
30+
}
31+
32+
test("No of Movies by Genre") {
33+
spark.sparkContext.setLogLevel("ERROR")
34+
lazy val movieDF: DataFrame =
35+
Seq((21, "Toy Story (1995)", "Animation|Children's|Comedy")
36+
, (22, "Jumanji (1995)", "Adventure|Children's|Fantasy")
37+
, (23, "Grumpier Old Men (1995)", "Comedy|Romance")
38+
).toDF("MOVIEID", "TITLE", "GENRES")
39+
40+
val expected = Seq(
41+
("Romance", 1l),("Adventure", 1l),("Children's", 2l),("Fantasy", 1l),("Animation", 1l),("Comedy", 2l)
42+
).toDF("GENRES", "TOTAL_MOVIES")
43+
44+
val actual = MovieLensService.movieCountByGenre(movieDF)
45+
46+
actual.show()
47+
expected.show()
48+
49+
assertEqualsNullable(actual,expected)
50+
51+
}
52+
53+
test("Top 100 movie") {
54+
spark.sparkContext.setLogLevel("ERROR")
55+
56+
lazy val movieDF: DataFrame =
57+
Seq((21, "Toy Story (1995)", "Animation|Children's|Comedy")
58+
, (22, "Jumanji (1995)", "Adventure|Children's|Fantasy")
59+
, (23, "Grumpier Old Men (1995)", "Comedy|Romance")
60+
).toDF("MOVIEID", "TITLE", "GENRES")
61+
62+
lazy val ratingDF: DataFrame =
63+
Seq((1, 21, 5)
64+
, (1, 22, 4)
65+
, (1, 23, 5)
66+
, (2, 22, 4)
67+
, (2, 23, 3)
68+
, (3, 23, 2)
69+
).toDF("USERID", "MOVIEID", "RATINGS")
70+
71+
val expected = Seq(
72+
(1, 23, "Grumpier Old Men (1995)", 3),
73+
(2, 22, "Jumanji (1995)", 4),
74+
(3, 21, "Toy Story (1995)", 5)
75+
).toDF("RANKING", "MOVIEID", "TITLE", "AVG_RATINGS")
76+
77+
val actual = MovieLensService.top100Movie(movieDF, ratingDF)
78+
actual.show(false)
79+
expected.show()
80+
81+
assertEqualsNullable(actual,expected)
82+
83+
}
84+
85+
private[this] def sortAndOrderDataFrame(inputDataFrame: DataFrame): DataFrame = {
86+
val listColNames = inputDataFrame.schema.fieldNames
87+
scala.util.Sorting.quickSort(listColNames)
88+
val orderedDF = inputDataFrame.select(listColNames.map(name => col(name)): _*)
89+
val keys = orderedDF.schema.fieldNames.map(col(_))
90+
orderedDF.sort(keys: _*)
91+
}
92+
93+
private[this] def assertEqualsNullable(expected: DataFrame, actual: DataFrame): Unit = {
94+
val left = setNullableTrueForAllColumns(expected, true)
95+
val right = setNullableTrueForAllColumns(actual, true)
96+
assertDataFrameEquals(sortAndOrderDataFrame(left), sortAndOrderDataFrame(right))
97+
}
98+
99+
private[this] def setNullableTrueForAllColumns(df : DataFrame, nullable : Boolean)(implicit spark : SparkSession) : DataFrame = {
100+
val schema = df.schema
101+
val newSchema = StructType(schema.map{
102+
case StructField(c,t,_,m) => StructField(c,t,nullable = nullable ,m)
103+
})
104+
spark.createDataFrame(df.rdd, newSchema)
105+
}
106+
107+
}

0 commit comments

Comments
 (0)