Skip to content

[SPARK-18821][SparkR]: Bisecting k-means wrapper in SparkR #16566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ exportMethods("glm",
"spark.kstest",
"spark.logit",
"spark.randomForest",
"spark.gbt")
"spark.gbt",
"spark.bisectingKmeans")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down
5 changes: 5 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,11 @@ setGeneric("rbind", signature = "...")
#' @export
setGeneric("spark.als", function(data, ...) { standardGeneric("spark.als") })

#' @rdname spark.bisectingKmeans
#' @export
setGeneric("spark.bisectingKmeans",
function(data, formula, ...) { standardGeneric("spark.bisectingKmeans") })

#' @rdname spark.gaussianMixture
#' @export
setGeneric("spark.gaussianMixture",
Expand Down
149 changes: 149 additions & 0 deletions R/pkg/R/mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

# mllib_clustering.R: Provides methods for MLlib clustering algorithms integration

#' S4 class that represents a BisectingKMeansModel
#'
#' @param jobj a Java object reference to the backing Scala BisectingKMeansModel
#' @export
#' @note BisectingKMeansModel since 2.2.0
setClass("BisectingKMeansModel", representation(jobj = "jobj"))

#' S4 class that represents a GaussianMixtureModel
#'
#' @param jobj a Java object reference to the backing Scala GaussianMixtureModel
Expand All @@ -38,6 +45,148 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' @note LDAModel since 2.1.0
setClass("LDAModel", representation(jobj = "jobj"))

#' Bisecting K-Means Clustering Model
#'
#' Fits a bisecting k-means clustering model against a Spark DataFrame.
#' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make
#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
#'
#' @param data a SparkDataFrame for training.
#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' Note that the response variable of formula is empty in spark.bisectingKmeans.
#' @param k the desired number of leaf clusters. Must be > 1.
#' The actual number could be smaller if there are no divisible leaf clusters.
#' @param maxIter maximum iteration number.
#' @param seed the random seed.
#' @param minDivisibleClusterSize The minimum number of points (if greater than or equal to 1.0)
#' or the minimum proportion of points (if less than 1.0) of a divisible cluster.
#' Note that it is an expert parameter. The default value should be good enough
#' for most cases.
#' @param ... additional argument(s) passed to the method.
#' @return \code{spark.bisectingKmeans} returns a fitted bisecting k-means model.
#' @rdname spark.bisectingKmeans
#' @aliases spark.bisectingKmeans,SparkDataFrame,formula-method
#' @name spark.bisectingKmeans
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- createDataFrame(iris)
#' model <- spark.bisectingKmeans(df, Sepal_Length ~ Sepal_Width, k = 4)
#' summary(model)
#'
#' # get fitted result from a bisecting k-means model
#' fitted.model <- fitted(model, "centers")
#' showDF(fitted.model)
#'
#' # fitted values on training data
#' fitted <- predict(model, df)
#' head(select(fitted, "Sepal_Length", "prediction"))
#'
#' # save fitted model to input path
#' path <- "path/to/model"
#' write.ml(model, path)
#'
#' # can also read back the saved model and print
#' savedModel <- read.ml(path)
#' summary(savedModel)
#' }
#' @note spark.bisectingKmeans since 2.2.0
#' @seealso \link{predict}, \link{read.ml}, \link{write.ml}
setMethod("spark.bisectingKmeans", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, k = 4, maxIter = 20, seed = NULL, minDivisibleClusterSize = 1.0) {
formula <- paste0(deparse(formula), collapse = "")
if (!is.null(seed)) {
seed <- as.character(as.integer(seed))
}
jobj <- callJStatic("org.apache.spark.ml.r.BisectingKMeansWrapper", "fit",
data@sdf, formula, as.integer(k), as.integer(maxIter),
seed, as.numeric(minDivisibleClusterSize))
new("BisectingKMeansModel", jobj = jobj)
})

# Get the summary of a bisecting k-means model

#' @param object a fitted bisecting k-means model.
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list includes the model's \code{k} (number of cluster centers),
#' \code{coefficients} (model cluster centers),
#' \code{size} (number of data points in each cluster), \code{cluster}
#' (cluster centers of the transformed data; cluster is NULL if is.loaded is TRUE),
#' and \code{is.loaded} (whether the model is loaded from a saved file).
#' @rdname spark.bisectingKmeans
#' @export
#' @note summary(BisectingKMeansModel) since 2.2.0
setMethod("summary", signature(object = "BisectingKMeansModel"),
function(object) {
jobj <- object@jobj
is.loaded <- callJMethod(jobj, "isLoaded")
features <- callJMethod(jobj, "features")
coefficients <- callJMethod(jobj, "coefficients")
k <- callJMethod(jobj, "k")
size <- callJMethod(jobj, "size")
coefficients <- t(matrix(coefficients, ncol = k))
colnames(coefficients) <- unlist(features)
rownames(coefficients) <- 1:k
cluster <- if (is.loaded) {
NULL
} else {
dataFrame(callJMethod(jobj, "cluster"))
}
list(k = k, coefficients = coefficients, size = size,
cluster = cluster, is.loaded = is.loaded)
})

# Predicted values based on a bisecting k-means model

#' @param newData a SparkDataFrame for testing.
#' @return \code{predict} returns the predicted values based on a bisecting k-means model.
#' @rdname spark.bisectingKmeans
#' @export
#' @note predict(BisectingKMeansModel) since 2.2.0
setMethod("predict", signature(object = "BisectingKMeansModel"),
function(object, newData) {
predict_internal(object, newData)
})

#' Get fitted result from a bisecting k-means model
#'
#' Get fitted result from a bisecting k-means model.
#' Note: A saved-loaded model does not support this method.
#'
#' @param method type of fitted results, \code{"centers"} for cluster centers
#' or \code{"classes"} for assigned classes.
#' @return \code{fitted} returns a SparkDataFrame containing fitted values.
#' @rdname spark.bisectingKmeans
#' @export
#' @note fitted since 2.2.0
setMethod("fitted", signature(object = "BisectingKMeansModel"),
function(object, method = c("centers", "classes")) {
method <- match.arg(method)
jobj <- object@jobj
is.loaded <- callJMethod(jobj, "isLoaded")
if (is.loaded) {
stop("Saved-loaded bisecting k-means model does not support 'fitted' method")
} else {
dataFrame(callJMethod(jobj, "fitted", method))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how much is returned from fitted? should this be a list (like in summary) instead of DataFrame?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fitted in bisectingKmeans is quite similar to fitted in Kmeans. I followed that style to return a dataframe.

}
})

# Save fitted MLlib model to the input path

#' @param path the directory where the model is saved.
#' @param overwrite overwrites or not if the output path already exists. Default is FALSE
#' which means throw exception if the output path exists.
#'
#' @rdname spark.bisectingKmeans
#' @export
#' @note write.ml(BisectingKMeansModel, character) since 2.2.0
setMethod("write.ml", signature(object = "BisectingKMeansModel", path = "character"),
function(object, path, overwrite = FALSE) {
write_internal(object, path, overwrite)
})

#' Multivariate Gaussian Mixture Model (GMM)
#'
#' Fits multivariate gaussian mixture model against a Spark DataFrame, similarly to R's
Expand Down
10 changes: 6 additions & 4 deletions R/pkg/R/mllib_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
#' @rdname write.ml
#' @name write.ml
#' @export
#' @seealso \link{spark.glm}, \link{glm},
#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
#' @seealso \link{spark.kmeans},
#' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
#' @seealso \link{spark.randomForest}, \link{spark.survreg},
Expand All @@ -47,8 +47,8 @@ NULL
#' @rdname predict
#' @name predict
#' @export
#' @seealso \link{spark.glm}, \link{glm},
#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
#' @seealso \link{spark.kmeans},
#' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
#' @seealso \link{spark.randomForest}, \link{spark.survreg}
Expand Down Expand Up @@ -113,6 +113,8 @@ read.ml <- function(path) {
new("GBTRegressionModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GBTClassifierWrapper")) {
new("GBTClassificationModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.BisectingKMeansWrapper")) {
new("BisectingKMeansModel", jobj = jobj)
} else {
stop("Unsupported model: ", jobj)
}
Expand Down
40 changes: 40 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,46 @@ absoluteSparkPath <- function(x) {
file.path(sparkHome, x)
}

test_that("spark.bisectingKmeans", {
newIris <- iris
newIris$Species <- NULL
training <- suppressWarnings(createDataFrame(newIris))

take(training, 1)

model <- spark.bisectingKmeans(data = training, ~ .)
sample <- take(select(predict(model, training), "prediction"), 1)
expect_equal(typeof(sample$prediction), "integer")
expect_equal(sample$prediction, 1)

# Test fitted works on Bisecting KMeans
fitted.model <- fitted(model)
expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction),
c(0, 1, 2, 3))

# Test summary works on KMeans
summary.model <- summary(model)
cluster <- summary.model$cluster
k <- summary.model$k
expect_equal(k, 4)
expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction),
c(0, 1, 2, 3))

# Test model save/load
modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)
expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
expect_equal(summary.model$coefficients, summary2$coefficients)
expect_true(!summary.model$is.loaded)
expect_true(summary2$is.loaded)

unlink(modelPath)
})

test_that("spark.gaussianMixture", {
# R code to reproduce the result.
# nolint start
Expand Down
Loading