Skip to content

Commit 5b5159e

Browse files
committed
Merge remote-tracking branch 'origin/branch-2.1' into branch-2.1
2 parents 171e653 + a7364a8 commit 5b5159e

File tree

46 files changed

+827
-264
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+827
-264
lines changed

R/pkg/DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: SparkR
22
Type: Package
3-
Version: 2.1.0
3+
Version: 2.1.1
44
Title: R Frontend for Apache Spark
55
Description: The SparkR package provides an R Frontend for Apache Spark.
66
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),

R/pkg/R/mllib.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,14 +1595,14 @@ setMethod("write.ml", signature(object = "ALSModel", path = "character"),
15951595
#' \dontrun{
15961596
#' data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25))
15971597
#' df <- createDataFrame(data)
1598-
#' test <- spark.ktest(df, "test", "norm", c(0, 1))
1598+
#' test <- spark.kstest(df, "test", "norm", c(0, 1))
15991599
#'
16001600
#' # get a summary of the test result
16011601
#' testSummary <- summary(test)
16021602
#' testSummary
16031603
#'
16041604
#' # print out the summary in an organized way
1605-
#' print.summary.KSTest(test)
1605+
#' print.summary.KSTest(testSummary)
16061606
#' }
16071607
#' @note spark.kstest since 2.1.0
16081608
setMethod("spark.kstest", signature(data = "SparkDataFrame"),

R/pkg/inst/tests/testthat/test_mllib.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,12 @@ test_that("spark.kstest", {
986986
expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4)
987987
expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4)
988988
expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:")
989+
990+
# Test print.summary.KSTest
991+
printStats <- capture.output(print.summary.KSTest(stats))
992+
expect_match(printStats[1], "Kolmogorov-Smirnov test summary:")
993+
expect_match(printStats[5],
994+
"Low presumption against null hypothesis: Sample follows theoretical distribution. ")
989995
})
990996

991997
test_that("spark.randomForest", {

R/pkg/vignettes/sparkr-vignettes.Rmd

Lines changed: 120 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -447,25 +447,31 @@ head(teenagers)
447447

448448
SparkR supports the following machine learning models and algorithms.
449449

450+
* Accelerated Failure Time (AFT) Survival Model
451+
452+
* Collaborative Filtering with Alternating Least Squares (ALS)
453+
454+
* Gaussian Mixture Model (GMM)
455+
450456
* Generalized Linear Model (GLM)
451457

452-
* Naive Bayes Model
458+
* Gradient-Boosted Trees (GBT)
459+
460+
* Isotonic Regression Model
453461

454462
* $k$-means Clustering
455463

456-
* Accelerated Failure Time (AFT) Survival Model
457-
458-
* Gaussian Mixture Model (GMM)
464+
* Kolmogorov-Smirnov Test
459465

460466
* Latent Dirichlet Allocation (LDA)
461467

462-
* Multilayer Perceptron Model
468+
* Logistic Regression Model
463469

464-
* Collaborative Filtering with Alternating Least Squares (ALS)
470+
* Multilayer Perceptron Model
465471

466-
* Isotonic Regression Model
472+
* Naive Bayes Model
467473

468-
More will be added in the future.
474+
* Random Forest
469475

470476
### R Formula
471477

@@ -526,6 +532,34 @@ gaussianFitted <- predict(gaussianGLM, carsDF)
526532
head(select(gaussianFitted, "model", "prediction", "mpg", "wt", "hp"))
527533
```
528534

535+
#### Random Forest
536+
537+
`spark.randomForest` fits a [random forest](https://en.wikipedia.org/wiki/Random_forest) classification or regression model on a `SparkDataFrame`.
538+
Users can call `summary` to get a summary of the fitted model, `predict` to make predictions, and `write.ml`/`read.ml` to save/load fitted models.
539+
540+
In the following example, we use the `longley` dataset to train a random forest and make predictions:
541+
542+
```{r, warning=FALSE}
543+
df <- createDataFrame(longley)
544+
rfModel <- spark.randomForest(df, Employed ~ ., type = "regression", maxDepth = 2, numTrees = 2)
545+
summary(rfModel)
546+
predictions <- predict(rfModel, df)
547+
```
548+
549+
#### Gradient-Boosted Trees
550+
551+
`spark.gbt` fits a [gradient-boosted tree](https://en.wikipedia.org/wiki/Gradient_boosting) classification or regression model on a `SparkDataFrame`.
552+
Users can call `summary` to get a summary of the fitted model, `predict` to make predictions, and `write.ml`/`read.ml` to save/load fitted models.
553+
554+
Similar to the random forest example above, we use the `longley` dataset to train a gradient-boosted tree and make predictions:
555+
556+
```{r, warning=FALSE}
557+
df <- createDataFrame(longley)
558+
gbtModel <- spark.gbt(df, Employed ~ ., type = "regression", maxDepth = 2, maxIter = 2)
559+
summary(gbtModel)
560+
predictions <- predict(gbtModel, df)
561+
```
562+
529563
#### Naive Bayes Model
530564

531565
Naive Bayes model assumes independence among the features. `spark.naiveBayes` fits a [Bernoulli naive Bayes model](https://en.wikipedia.org/wiki/Naive_Bayes_classifier#Bernoulli_naive_Bayes) against a SparkDataFrame. The data should be all categorical. These models are often used for document classification.
@@ -565,8 +599,6 @@ head(aftPredictions)
565599

566600
#### Gaussian Mixture Model
567601

568-
(Coming in 2.1.0)
569-
570602
`spark.gaussianMixture` fits multivariate [Gaussian Mixture Model](https://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model) (GMM) against a `SparkDataFrame`. [Expectation-Maximization](https://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm) (EM) is used to approximate the maximum likelihood estimator (MLE) of the model.
571603

572604
We use a simulated example to demostrate the usage.
@@ -584,8 +616,6 @@ head(select(gmmFitted, "V1", "V2", "prediction"))
584616

585617
#### Latent Dirichlet Allocation
586618

587-
(Coming in 2.1.0)
588-
589619
`spark.lda` fits a [Latent Dirichlet Allocation](https://en.wikipedia.org/wiki/Latent_Dirichlet_allocation) model on a `SparkDataFrame`. It is often used in topic modeling in which topics are inferred from a collection of text documents. LDA can be thought of as a clustering algorithm as follows:
590620

591621
* Topics correspond to cluster centers, and documents correspond to examples (rows) in a dataset.
@@ -600,22 +630,6 @@ To use LDA, we need to specify a `features` column in `data` where each entry re
600630

601631
* libSVM: Each entry is a collection of words and will be processed directly.
602632

603-
There are several parameters LDA takes for fitting the model.
604-
605-
* `k`: number of topics (default 10).
606-
607-
* `maxIter`: maximum iterations (default 20).
608-
609-
* `optimizer`: optimizer to train an LDA model, "online" (default) uses [online variational inference](https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf). "em" uses [expectation-maximization](https://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm).
610-
611-
* `subsamplingRate`: For `optimizer = "online"`. Fraction of the corpus to be sampled and used in each iteration of mini-batch gradient descent, in range (0, 1] (default 0.05).
612-
613-
* `topicConcentration`: concentration parameter (commonly named beta or eta) for the prior placed on topic distributions over terms, default -1 to set automatically on the Spark side. Use `summary` to retrieve the effective topicConcentration. Only 1-size numeric is accepted.
614-
615-
* `docConcentration`: concentration parameter (commonly named alpha) for the prior placed on documents distributions over topics (theta), default -1 to set automatically on the Spark side. Use `summary` to retrieve the effective docConcentration. Only 1-size or k-size numeric is accepted.
616-
617-
* `maxVocabSize`: maximum vocabulary size, default 1 << 18.
618-
619633
Two more functions are provided for the fitted model.
620634

621635
* `spark.posterior` returns a `SparkDataFrame` containing a column of posterior probabilities vectors named "topicDistribution".
@@ -654,11 +668,8 @@ perplexity <- spark.perplexity(model, corpusDF)
654668
perplexity
655669
```
656670

657-
658671
#### Multilayer Perceptron
659672

660-
(Coming in 2.1.0)
661-
662673
Multilayer perceptron classifier (MLPC) is a classifier based on the [feedforward artificial neural network](https://en.wikipedia.org/wiki/Feedforward_neural_network). MLPC consists of multiple layers of nodes. Each layer is fully connected to the next layer in the network. Nodes in the input layer represent the input data. All other nodes map inputs to outputs by a linear combination of the inputs with the node’s weights $w$ and bias $b$ and applying an activation function. This can be written in matrix form for MLPC with $K+1$ layers as follows:
663674
$$
664675
y(x)=f_K(\ldots f_2(w_2^T f_1(w_1^T x + b_1) + b_2) \ldots + b_K).
@@ -678,24 +689,35 @@ The number of nodes $N$ in the output layer corresponds to the number of classes
678689

679690
MLPC employs backpropagation for learning the model. We use the logistic loss function for optimization and L-BFGS as an optimization routine.
680691

681-
`spark.mlp` requires at least two columns in `data`: one named `"label"` and the other one `"features"`. The `"features"` column should be in libSVM-format. According to the description above, there are several additional parameters that can be set:
692+
`spark.mlp` requires at least two columns in `data`: one named `"label"` and the other one `"features"`. The `"features"` column should be in libSVM-format.
682693

683-
* `layers`: integer vector containing the number of nodes for each layer.
684-
685-
* `solver`: solver parameter, supported options: `"gd"` (minibatch gradient descent) or `"l-bfgs"`.
686-
687-
* `maxIter`: maximum iteration number.
688-
689-
* `tol`: convergence tolerance of iterations.
690-
691-
* `stepSize`: step size for `"gd"`.
694+
We use iris data set to show how to use `spark.mlp` in classification.
695+
```{r, warning=FALSE}
696+
df <- createDataFrame(iris)
697+
# fit a Multilayer Perceptron Classification Model
698+
model <- spark.mlp(df, Species ~ ., blockSize = 128, layers = c(4, 3), solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1, initialWeights = c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
699+
```
692700

693-
* `seed`: seed parameter for weights initialization.
701+
To avoid lengthy display, we only present partial results of the model summary. You can check the full result from your sparkR shell.
702+
```{r, include=FALSE}
703+
ops <- options()
704+
options(max.print=5)
705+
```
706+
```{r}
707+
# check the summary of the fitted model
708+
summary(model)
709+
```
710+
```{r, include=FALSE}
711+
options(ops)
712+
```
713+
```{r}
714+
# make predictions use the fitted model
715+
predictions <- predict(model, df)
716+
head(select(predictions, predictions$prediction))
717+
```
694718

695719
#### Collaborative Filtering
696720

697-
(Coming in 2.1.0)
698-
699721
`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](http://dl.acm.org/citation.cfm?id=1608614).
700722

701723
There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, `nonnegative`. For a complete list, refer to the help file.
@@ -725,8 +747,6 @@ head(predicted)
725747

726748
#### Isotonic Regression Model
727749

728-
(Coming in 2.1.0)
729-
730750
`spark.isoreg` fits an [Isotonic Regression](https://en.wikipedia.org/wiki/Isotonic_regression) model against a `SparkDataFrame`. It solves a weighted univariate a regression problem under a complete order constraint. Specifically, given a set of real observed responses $y_1, \ldots, y_n$, corresponding real features $x_1, \ldots, x_n$, and optionally positive weights $w_1, \ldots, w_n$, we want to find a monotone (piecewise linear) function $f$ to minimize
731751
$$
732752
\ell(f) = \sum_{i=1}^n w_i (y_i - f(x_i))^2.
@@ -768,8 +788,60 @@ newDF <- createDataFrame(data.frame(x = c(1.5, 3.2)))
768788
head(predict(isoregModel, newDF))
769789
```
770790

771-
#### What's More?
772-
We also expect Decision Tree, Random Forest, Kolmogorov-Smirnov Test coming in the next version 2.1.0.
791+
#### Logistic Regression Model
792+
793+
[Logistic regression](https://en.wikipedia.org/wiki/Logistic_regression) is a widely-used model when the response is categorical. It can be seen as a special case of the [Generalized Linear Predictive Model](https://en.wikipedia.org/wiki/Generalized_linear_model).
794+
We provide `spark.logit` on top of `spark.glm` to support logistic regression with advanced hyper-parameters.
795+
It supports both binary and multiclass classification with elastic-net regularization and feature standardization, similar to `glmnet`.
796+
797+
We use a simple example to demonstrate `spark.logit` usage. In general, there are three steps of using `spark.logit`:
798+
1). Create a dataframe from a proper data source; 2). Fit a logistic regression model using `spark.logit` with a proper parameter setting;
799+
and 3). Obtain the coefficient matrix of the fitted model using `summary` and use the model for prediction with `predict`.
800+
801+
Binomial logistic regression
802+
```{r, warning=FALSE}
803+
df <- createDataFrame(iris)
804+
# Create a DataFrame containing two classes
805+
training <- df[df$Species %in% c("versicolor", "virginica"), ]
806+
model <- spark.logit(training, Species ~ ., regParam = 0.00042)
807+
summary(model)
808+
```
809+
810+
Predict values on training data
811+
```{r}
812+
fitted <- predict(model, training)
813+
```
814+
815+
Multinomial logistic regression against three classes
816+
```{r, warning=FALSE}
817+
df <- createDataFrame(iris)
818+
# Note in this case, Spark infers it is multinomial logistic regression, so family = "multinomial" is optional.
819+
model <- spark.logit(df, Species ~ ., regParam = 0.056)
820+
summary(model)
821+
```
822+
823+
#### Kolmogorov-Smirnov Test
824+
825+
`spark.kstest` runs a two-sided, one-sample [Kolmogorov-Smirnov (KS) test](https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test).
826+
Given a `SparkDataFrame`, the test compares continuous data in a given column `testCol` with the theoretical distribution
827+
specified by parameter `nullHypothesis`.
828+
Users can call `summary` to get a summary of the test results.
829+
830+
In the following example, we test whether the `longley` dataset's `Armed_Forces` column
831+
follows a normal distribution. We set the parameters of the normal distribution using
832+
the mean and standard deviation of the sample.
833+
834+
```{r, warning=FALSE}
835+
df <- createDataFrame(longley)
836+
afStats <- head(select(df, mean(df$Armed_Forces), sd(df$Armed_Forces)))
837+
afMean <- afStats[1]
838+
afStd <- afStats[2]
839+
840+
test <- spark.kstest(df, "Armed_Forces", "norm", c(afMean, afStd))
841+
testSummary <- summary(test)
842+
testSummary
843+
```
844+
773845

774846
### Model Persistence
775847
The following example shows how to save/load an ML model by SparkR.

core/src/main/resources/org/apache/spark/ui/static/executorspage.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,6 @@ $(document).ready(function () {
457457
}
458458
],
459459
"columnDefs": [
460-
{
461-
"targets": [ 15 ],
462-
"visible": logsExist(response)
463-
},
464460
{
465461
"targets": [ 16 ],
466462
"visible": getThreadDumpEnabled()
@@ -469,7 +465,8 @@ $(document).ready(function () {
469465
"order": [[0, "asc"]]
470466
};
471467

472-
$(selector).DataTable(conf);
468+
var dt = $(selector).DataTable(conf);
469+
dt.column(15).visible(logsExist(response));
473470
$('#active-executors [data-toggle="tooltip"]').tooltip();
474471

475472
var sumSelector = "#summary-execs-table";

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ private[spark] object ThreadUtils {
209209
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
210210
// See SPARK-13747.
211211
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
212-
awaitable.result(Duration.Inf)(awaitPermission)
212+
awaitable.result(atMost)(awaitPermission)
213213
} catch {
214214
case NonFatal(t) =>
215215
throw new SparkException("Exception thrown in awaitResult: ", t)

dev/run-tests-jenkins.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def pr_message(build_display_name,
8080
short_commit_hash,
8181
commit_url,
8282
str(' ' + post_msg + '.') if post_msg else '.')
83-
return '**[Test build %s %s](%sconsoleFull)** for PR %s at commit [`%s`](%s)%s' % str_args
83+
return '**[Test build %s %s](%stestReport)** for PR %s at commit [`%s`](%s)%s' % str_args
8484

8585

8686
def run_pr_checks(pr_tests, ghprb_actual_commit, sha1):

docs/_plugins/copy_api_dirs.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,7 @@
142142
puts "cp -r R/pkg/html/. docs/api/R"
143143
cp_r("R/pkg/html/.", "docs/api/R")
144144

145+
puts "cp R/pkg/DESCRIPTION docs/api"
146+
cp("R/pkg/DESCRIPTION", "docs/api")
147+
145148
end

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
845845
}
846846
}
847847

848-
test("stress test for failOnDataLoss=false") {
848+
ignore("stress test for failOnDataLoss=false") {
849849
val reader = spark
850850
.readStream
851851
.format("kafka")

python/pyspark/sql/streaming.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
from pyspark import since, keyword_only
3030
from pyspark.rdd import ignore_unicode_prefix
31+
from pyspark.sql.column import _to_seq
3132
from pyspark.sql.readwriter import OptionUtils, to_str
3233
from pyspark.sql.types import *
3334
from pyspark.sql.utils import StreamingQueryException
@@ -125,10 +126,15 @@ def recentProgress(self):
125126
@since(2.1)
126127
def lastProgress(self):
127128
"""
128-
Returns the most recent :class:`StreamingQueryProgress` update of this streaming query.
129+
Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or
130+
None if there were no progress updates
129131
:return: a map
130132
"""
131-
return json.loads(self._jsq.lastProgress().json())
133+
lastProgress = self._jsq.lastProgress()
134+
if lastProgress:
135+
return json.loads(lastProgress.json())
136+
else:
137+
return None
132138

133139
@since(2.0)
134140
def processAllAvailable(self):

0 commit comments

Comments
 (0)