Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation to movie-lens benchmark #441

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import org.renaissance.Benchmark
import org.renaissance.Benchmark._
import org.renaissance.BenchmarkContext
import org.renaissance.BenchmarkResult
import org.renaissance.BenchmarkResult.Validators
import org.renaissance.BenchmarkResult.Assert
import org.renaissance.BenchmarkResult.ValidationException
import org.renaissance.License
import org.renaissance.apache.spark.ResourceUtil.linesFromUrl
import org.renaissance.apache.spark.ResourceUtil.writeResourceToFile
Expand All @@ -32,24 +33,42 @@ import scala.collection.Map
@Parameter(
name = "als_configs",
defaultValue =
"rmse,rank,lambda,iterations;" +
"3.622, 8,5.00,20;" +
"2.134,10,2.00,20;" +
"1.311,12,1.00,20;" +
"0.992, 8,0.05,20;" +
"1.207,10,0.01,10;" +
"1.115, 8,0.02,10;" +
"0.923,12,0.10,10;" +
"0.898, 8,0.20,10",
summary = "A table of ALS configuration parameters and expected RMSE values."
"rank,lambda,iterations;" +
" 8,5.00,20;" +
"10,2.00,20;" +
"12,1.00,20;" +
" 8,0.05,20;" +
"10,0.01,10;" +
" 8,0.02,10;" +
"12,0.10,10;" +
" 8,0.20,10",
summary = "A table of ALS configuration parameters to try."
)
@Parameter(
name = "top_recommended_movie_count",
defaultValue = "5",
summary = "Number of top recommended movies to check for expected movies during validation."
)
@Parameter(
name = "expected_movie_ids",
defaultValue = "67504,83318,83359,83411,8530",
summary = "Movie identifiers that must (all) be found among the top recommended movies."
)
@Parameter(
name = "expected_best_validation_rmse",
defaultValue = "0.898",
summary = "The expected RMSE achieved by the best model on the validation subset."
)
@Configuration(
name = "test",
settings = Array(
"input_file = /ratings-small.csv",
"als_configs = " +
"rmse,rank,lambda,iterations;" +
"1.086,8,0.20,10"
"rank,lambda,iterations;" +
"8,0.20,10",
"top_recommended_movie_count = 2",
"expected_movie_ids = 1254",
"expected_best_validation_rmse = 1.086"
)
)
@Configuration(name = "jmh")
Expand All @@ -70,14 +89,21 @@ final class MovieLens extends Benchmark with SparkUtil {

private val helper = new MovieLensHelper

/** Holds ALS parameters and expected RMSE on validation data. */
case class AlsConfig(rank: Int, lambda: Double, iterations: Int, rmse: Double)
private var topRecommendedMovieCount: Int = _

private var expectedMovieIds: Seq[Int] = _

private var expectedBestValidationRmse: Double = _

/** Holds ALS training configuration. */
case class AlsConfig(rank: Int, lambda: Double, iterations: Int)

class MovieLensHelper {
var movies: Map[Int, String] = _
var ratings: RDD[(Long, Rating)] = _
var personalRatings: Seq[Rating] = _
var personalRatingsRDD: RDD[Rating] = _
var personalRatingsUserId: Int = _
var training: RDD[Rating] = _
var validation: RDD[Rating] = _
var test: RDD[Rating] = _
Expand Down Expand Up @@ -108,15 +134,15 @@ final class MovieLens extends Benchmark with SparkUtil {
// Get only entries with positive rating.
val lines = sparkContext.parallelize(linesFromUrl(url))
val ratings = parseRatingsCsvLines(lines).values.filter { _.rating > 0.0 }
assume(!ratings.isEmpty(), "collection of personal ratings is not empty!")

if (ratings.isEmpty()) {
// TODO Fail the benchmark here.
sys.error("No ratings provided.")
} else {
personalRatings = ratings.collect().toSeq
}
val positiveRatings = ratings.collect().toSeq
val userIds = positiveRatings.map(_.user).distinct
assume(userIds.length == 1, "personal ratings come from a single user!")

personalRatings = positiveRatings
personalRatingsRDD = ensureCached(ratings)
personalRatingsUserId = userIds.head
}

def loadRatings(file: Path) = {
Expand All @@ -141,6 +167,7 @@ final class MovieLens extends Benchmark with SparkUtil {
}

def splitRatings(trainingThreshold: Int, validationThreshold: Int) = {
// Merge personal ratings into training data set and cache them.
training = ensureCached(
ratings
.filter(x => x._1 < trainingThreshold)
Expand All @@ -150,7 +177,9 @@ final class MovieLens extends Benchmark with SparkUtil {
numTraining = training.count()

validation = ensureCached(
ratings.filter(x => x._1 >= trainingThreshold && x._1 < validationThreshold).values
ratings
.filter(x => x._1 >= trainingThreshold && x._1 < validationThreshold)
.values
)
numValidation = validation.count()

Expand All @@ -159,10 +188,7 @@ final class MovieLens extends Benchmark with SparkUtil {
)
numTest = test.count()

println(
"Training: " + numTraining + ", validation: " + numValidation + ", test: "
+ numTest
)
println(s"Training: $numTraining, validation: $numValidation, test: $numTest")
}

def trainModels(configs: Iterable[AlsConfig]) = {
Expand Down Expand Up @@ -211,28 +237,16 @@ final class MovieLens extends Benchmark with SparkUtil {
)

val improvement = (baselineRmse - testRmse) / baselineRmse * 100
println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")

// Make personalized recommendations.
println(f"The best model improves the baseline by $improvement%.2f%%.")

val myRatedMovieIds = personalRatings.map(_.product).toSet
val candidates =
sparkContext.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
// Make personalized recommendations for movies not rated by the user.

val recommendations = bestModel.get
.predict(candidates.map((0, _)))
.collect()
.sortBy(-_.rating)
.take(50)

var i = 1
println("Movies recommended for you:")
recommendations.foreach { r =>
println("%2d".format(i) + ": " + movies(r.product))
i += 1
}
val ratedMovieIds = personalRatings.map(_.product).toSet
val candidates = sparkContext.parallelize(
movies.keys.filter(!ratedMovieIds.contains(_)).toSeq.map((personalRatingsUserId, _))
)

recommendations
bestModel.get.predict(candidates).collect()
}

/** Compute RMSE (Root Mean Squared Error). */
Expand All @@ -249,6 +263,11 @@ final class MovieLens extends Benchmark with SparkUtil {
override def setUpBeforeAll(bc: BenchmarkContext): Unit = {
import scala.jdk.CollectionConverters._

// Validation parameters.
topRecommendedMovieCount = bc.parameter("top_recommended_movie_count").toInteger
expectedMovieIds = bc.parameter("expected_movie_ids").toList(_.toInt).asScala.toSeq
expectedBestValidationRmse = bc.parameter("expected_best_validation_rmse").toDouble

//
// Without a checkpoint directory set, JMH runs of this
// benchmark in Travis CI tend to crash with stack overflow.
Expand All @@ -265,16 +284,15 @@ final class MovieLens extends Benchmark with SparkUtil {
AlsConfig(
m.get("rank").toInt,
m.get("lambda").toDouble,
m.get("iterations").toInt,
m.get("rmse").toDouble
m.get("iterations").toInt
)
)
.asScala

loadData(bc.scratchDirectory())

// Split ratings into train (60%), validation (20%), and test (20%) based on the
// last digit of the timestamp, add myRatings to train, and cache them.
// Split ratings into training (~60%), validation (~20%), and test (~20%)
// data sets based on the last digit of a rating's timestamp.
helper.splitRatings(6, 8)
}

Expand All @@ -291,10 +309,39 @@ final class MovieLens extends Benchmark with SparkUtil {

override def run(bc: BenchmarkContext): BenchmarkResult = {
helper.trainModels(alsConfigurations)
val recommendations = helper.recommendMovies()

// TODO: add proper validation
Validators.dummy(recommendations)
val topRecommended = helper
.recommendMovies()
.sortBy(r => (-r.rating, r.product))
.take(topRecommendedMovieCount)

println(s"Top recommended movies for user id ${helper.personalRatingsUserId}:")
topRecommended.zipWithIndex.foreach {
case (r: Rating, i: Int) =>
println(
f"${i + 1}%2d: ${helper.movies(r.product)}%s (rating: ${r.rating}%.3f, id: ${r.product}%d)"
)
}

() => validate(topRecommended)
}

private def validate(recommendedMovies: Array[Rating]): Unit = {
val recommendedMovieIds = recommendedMovies.map(_.product)
expectedMovieIds.foreach(expectedId => {
if (!recommendedMovieIds.contains(expectedId)) {
throw new ValidationException(
s"Expected ${recommendedMovies.length} top-rated movies to contain movie with id $expectedId"
)
}
})

Assert.assertEquals(
expectedBestValidationRmse,
helper.bestValidationRmse,
0.005,
"Best model RMSE on the validation set"
)
}

override def tearDownAfterAll(bc: BenchmarkContext): Unit = {
Expand Down