Skip to content

MapGroupsWithState, and FlatMapGroupsWithState #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 124 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
124 commits
Select commit Hold shift + click to select a range
f8db894
[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset fr…
CodingCat Jan 17, 2017
18ee55d
[SPARK-19148][SQL] do not expose the external table concept in Catalog
cloud-fan Jan 17, 2017
84f0b64
[MINOR][YARN] Move YarnSchedulerBackendSuite to resource-managers/yar…
yanboliang Jan 17, 2017
a774bca
[SPARK-19240][SQL][TEST] add test for setting location for managed table
cloud-fan Jan 17, 2017
0019005
[SPARK-19219][SQL] Fix Parquet log output defaults
nicklavers Jan 17, 2017
6c00c06
[SPARK-3249][DOC] Fix links in ScalaDoc that cause warning messages i…
HyukjinKwon Jan 17, 2017
b79cc7c
[SPARK-19179][YARN] Change spark.yarn.access.namenodes config and upd…
jerryshao Jan 17, 2017
20e6280
[SPARK-19019] [PYTHON] Fix hijacked `collections.namedtuple` and port…
HyukjinKwon Jan 17, 2017
a83accf
[SPARK-19065][SQL] Don't inherit expression id in dropDuplicates
zsxwing Jan 17, 2017
a23debd
[SPARK-19129][SQL] SessionCatalog: Disallow empty part col values in …
gatorsmile Jan 17, 2017
843ec8e
[SPARK-19239][PYSPARK] Check parameters whether equals None when spec…
Jan 17, 2017
fee20df
[MINOR][SQL] Remove duplicate call of reset() function in CurrentOrig…
jiangxb1987 Jan 17, 2017
83dff87
[SPARK-18917][SQL] Remove schema check in appending data
rxin Jan 17, 2017
2992a0e
[SPARK-13721][SQL] Support outer generators in DataFrame API
Jan 17, 2017
e7f982b
[SPARK-18206][ML] Add instrumentation for MLP,NB,LDA,AFT,GLM,Isotonic…
zhengruifeng Jan 17, 2017
d10da27
Prototype - almost working
tdas Jan 18, 2017
78cd185
Renamed to mapGroupsWithState
tdas Jan 18, 2017
0c22e08
Fixed bugs
tdas Jan 18, 2017
4494cd9
[SPARK-18243][SQL] Port Hive writing to use FileFormat interface
cloud-fan Jan 18, 2017
eefdf9f
[SPARK-19227][SPARK-19251] remove unused imports and outdated comments
uncleGen Jan 18, 2017
17ce0b5
[SPARK-18782][BUILD] Bump Hadoop 2.6 version to use Hadoop 2.6.5
a-roberts Jan 18, 2017
52e14e4
Removed prints
tdas Jan 18, 2017
529aefe
Test state remove
tdas Jan 18, 2017
f85f296
[SPARK-19024][SQL] Implement new approach to write a permanent view
jiangxb1987 Jan 18, 2017
d06172b
[SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources …
viirya Jan 18, 2017
278fa1e
[SPARK-19231][SPARKR] add error handling for download and untar for S…
felixcheung Jan 18, 2017
33791a8
[SPARK-18113] Use ask to replace askWithRetry in canCommit and make r…
Jan 18, 2017
c050c12
[SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from a…
zsxwing Jan 18, 2017
569e506
[SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon…
lw-lin Jan 18, 2017
a81e336
[SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListe…
uncleGen Jan 18, 2017
57f5e8d
Test restart, and test with metrics
tdas Jan 18, 2017
fe409f3
[SPARK-14975][ML] Fixed GBTClassifier to predict probability per trai…
imatiach-msft Jan 18, 2017
0c92318
Update known_translations for contributor names
yhuai Jan 19, 2017
2e62560
[SPARK-19265][SQL] make table relation cache general and does not dep…
cloud-fan Jan 19, 2017
8ccca91
[SPARK-14272][ML] Add Loglikelihood in GaussianMixtureSummary
zhengruifeng Jan 19, 2017
064fadd
[SPARK-19059][SQL] Unable to retrieve data from parquet table whose n…
jayadevanmurali Jan 19, 2017
640f942
[SPARK-16654][CORE] Add UI coverage for Application Level Blacklisting
Jan 19, 2017
63d8390
[SPARK-19295][SQL] IsolatedClientLoader's downloadVersion should log …
yhuai Jan 19, 2017
148a84b
[SPARK-17912] [SQL] Refactor code generation to get data for ColumnVe…
kiszk Jan 19, 2017
0bf605c
[SPARK-19292][SQL] filter with partition columns should be case-insen…
cloud-fan Jan 20, 2017
039ed9f
[SPARK-19271][SQL] Change non-cbo estimation of aggregate
wzhfy Jan 20, 2017
d50d12b
[SPARK-19302][DOC][MINOR] Fix the wrong item format in security.md
sarutak Jan 20, 2017
e20d9b1
[SPARK-19069][CORE] Expose task 'status' and 'duration' in spark hist…
Jan 20, 2017
552e5f0
[SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in S…
tdas Jan 20, 2017
9b7a03f
[SPARK-18589][SQL] Fix Python UDF accessing attributes from both side…
Jan 21, 2017
ea31f92
[SPARK-19267][SS] Fix a race condition when stopping StateStore
zsxwing Jan 21, 2017
3e0d8dc
Fixed everything
tdas Jan 21, 2017
54268b4
[SPARK-16101][SQL] Refactoring CSV write path to be consistent with J…
HyukjinKwon Jan 21, 2017
f174cdc
[SPARK-14536][SQL] fix to handle null value in array type column for …
sureshthalamati Jan 21, 2017
3c2ba9f
[SPARK-19305][SQL] partitioned table should always put partition colu…
cloud-fan Jan 21, 2017
bcdabaa
[SPARK-17724][STREAMING][WEBUI] Unevaluated new lines in tooltip in D…
keypointt Jan 21, 2017
6113fe7
[SPARK-19117][SPARK-18922][TESTS] Fix the rest of flaky, newly introd…
HyukjinKwon Jan 21, 2017
aa014eb
[SPARK-19153][SQL] DataFrameWriter.saveAsTable work with create parti…
windpiger Jan 22, 2017
3dcad9f
[SPARK-19155][ML] MLlib GeneralizedLinearRegression family and link s…
yanboliang Jan 22, 2017
0c589e3
[SPARK-19291][SPARKR][ML] spark.gaussianMixture supports output log-l…
yanboliang Jan 22, 2017
74e65cb
[SPARK-16101][SQL] Refactoring CSV read path to be consistent with JS…
HyukjinKwon Jan 23, 2017
772035e
[SPARK-19229][SQL] Disallow Creating Hive Source Tables when Hive Sup…
gatorsmile Jan 23, 2017
de6ad3d
[SPARK-19309][SQL] disable common subexpression elimination for condi…
cloud-fan Jan 23, 2017
f067ace
[SPARK-19155][ML] Make family case insensitive in GLM
actuaryzhang Jan 23, 2017
c4a6519
[SPARK-19218][SQL] Fix SET command to show a result correctly and in …
dongjoon-hyun Jan 23, 2017
c994921
[SPARK-19146][CORE] Drop more elements when stageData.taskData.size >…
wangyum Jan 23, 2017
0ef1421
[SPARK-19284][SQL] append to partitioned datasource table should with…
windpiger Jan 23, 2017
4a11d02
[SPARK-14709][ML] spark.ml API for linear SVM
YY-OnCall Jan 23, 2017
c8aea74
[SPARK-17455][MLLIB] Improve PAVA implementation in IsotonicRegression
neggert Jan 23, 2017
5b258b8
[SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case
imatiach-msft Jan 23, 2017
e497472
[SPARK-19306][CORE] Fix inconsistent state in DiskBlockObject when ex…
jerryshao Jan 23, 2017
49f5b0a
[SPARK-17747][ML] WeightCol support non-double numeric datatypes
zhengruifeng Jan 24, 2017
fcfd5d0
[SPARK-19290][SQL] add a new extending interface in Analyzer for post…
cloud-fan Jan 24, 2017
3bdf3ee
[SPARK-19272][SQL] Remove the param `viewOriginalText` from `CatalogT…
jiangxb1987 Jan 24, 2017
e576c1e
[SPARK-9435][SQL] Reuse function in Java UDF to correctly support exp…
HyukjinKwon Jan 24, 2017
60bd91a
[SPARK-19268][SS] Disallow adaptive query execution for streaming que…
zsxwing Jan 24, 2017
ec9493b
[SPARK-16101][HOTFIX] Fix the build with Scala 2.10 by explicit typed…
HyukjinKwon Jan 24, 2017
f27e024
[SPARK-18823][SPARKR] add support for assigning to column
felixcheung Jan 24, 2017
7c61c2a
[DOCS] Fix typo in docs
uncleGen Jan 24, 2017
cca8680
delete useless var “j”
xiaoyesoso Jan 24, 2017
3c86fdd
[SPARK-19152][SQL] DataFrameWriter.saveAsTable support hive append
windpiger Jan 24, 2017
752502b
[SPARK-19246][SQL] CataLogTable's partitionSchema order and exist check
windpiger Jan 24, 2017
0ff67a1
[SPARK-14049][CORE] Add functionality in spark history sever API to q…
Jan 24, 2017
59c184e
[SPARK-17913][SQL] compare atomic and string type column may return c…
cloud-fan Jan 24, 2017
d978338
[SPARK-18036][ML][MLLIB] Fixing decision trees handling edge cases
imatiach-msft Jan 24, 2017
8f3f73a
[SPARK-19139][CORE] New auth mechanism for transport library.
Jan 24, 2017
cdb691e
[SPARK-19017][SQL] NOT IN subquery with more than one column may retu…
nsyca Jan 24, 2017
15ef374
[SPARK-19334][SQL] Fix the code injection vulnerability related to Ge…
sarutak Jan 24, 2017
40a4cfc
[SPARK-19330][DSTREAMS] Also show tooltip for successful batches
lw-lin Jan 25, 2017
3fdce81
[SPARK-16046][DOCS] Aggregations in the Spark SQL programming guide
Jan 25, 2017
76db394
[SPARK-18750][YARN] Avoid using "mapValues" when allocating containers.
Jan 25, 2017
0e821ec
[SPARK-19313][ML][MLLIB] GaussianMixture should limit the number of f…
sethah Jan 25, 2017
f1ddca5
[SPARK-18863][SQL] Output non-aggregate expressions without GROUP BY …
nsyca Jan 25, 2017
f6480b1
[SPARK-19311][SQL] fix UDT hierarchy issue
Jan 25, 2017
92afaa9
[SPARK-19307][PYSPARK] Make sure user conf is propagated to SparkCont…
Jan 25, 2017
965c82d
[SPARK-19064][PYSPARK] Fix pip installing of sub components
holdenk Jan 25, 2017
47d5d0d
[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD
tdas Jan 26, 2017
2338451
[SPARK-18495][UI] Document meaning of green dot in DAG visualization
uncleGen Jan 26, 2017
256a3a8
[SPARK-18020][STREAMING][KINESIS] Checkpoint SHARD_END to finish read…
maropu Jan 26, 2017
9effc2c
[TESTS][SQL] Setup testdata at the beginning for tests to run indepen…
dilipbiswal Jan 26, 2017
d3dcb63
[SPARK-19220][UI] Make redirection to HTTPS apply to all URIs.
Jan 26, 2017
7045b8b
[CORE][DOCS] Update a help message for --files in spark-submit
maropu Jan 26, 2017
2969fb4
[SPARK-18936][SQL] Infrastructure for session local timezone support.
ueshin Jan 26, 2017
9f523d3
[SPARK-19338][SQL] Add UDF names in explain
maropu Jan 26, 2017
1191fe2
[SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix mul…
WeichenXu123 Jan 27, 2017
c0ba284
[SPARK-18821][SPARKR] Bisecting k-means wrapper in SparkR
wangmiao1981 Jan 27, 2017
90817a6
[SPARK-18788][SPARKR] Add API for getNumPartitions
felixcheung Jan 27, 2017
4172ff8
[SPARK-18929][ML] Add Tweedie distribution in GLM
actuaryzhang Jan 27, 2017
4e35c5a
[SPARK-12970][DOCS] Fix the example in SturctType APIs for Scala and …
HyukjinKwon Jan 27, 2017
385d738
[SPARK-19333][SPARKR] Add Apache License headers to R files
felixcheung Jan 27, 2017
a7ab6f9
[SPARK-19324][SPARKR] Spark VJM stdout output is getting dropped in S…
felixcheung Jan 27, 2017
21aa8c3
[SPARK-19365][CORE] Optimize RequestMessage serialization
zsxwing Jan 27, 2017
bb1a1fe
[SPARK-19336][ML][PYSPARK] LinearSVC Python API
wangmiao1981 Jan 28, 2017
1b5ee20
[SPARK-19359][SQL] clear useless path after rename a partition with u…
windpiger Jan 28, 2017
42ad93b
[SPARK-19384][ML] forget unpersist input dataset in IsotonicRegression
zhengruifeng Jan 28, 2017
cfcfc92
[SPARK-19359][SQL] Revert Clear useless path after rename a partition…
gatorsmile Jan 28, 2017
f7c07db
[SPARK-19152][SQL][FOLLOWUP] simplify CreateHiveTableAsSelectCommand
cloud-fan Jan 29, 2017
e2e7b12
[SPARK-18872][SQL][TESTS] New test cases for EXISTS subquery
dilipbiswal Jan 29, 2017
ade075a
[SPARK-19385][SQL] During canonicalization, `NOT(...(l, r))` should n…
lw-lin Jan 29, 2017
b54fa23
Refactored, added java APIs and tests
tdas Jan 30, 2017
ab3cb6c
Refactored
tdas Jan 30, 2017
06fbc35
[SPARK-19403][PYTHON][SQL] Correct pyspark.sql.column.__all__ list.
zero323 Jan 30, 2017
ddf4550
Added more test
tdas Jan 30, 2017
c0eda7e
[SPARK-19396][DOC] JDBC Options are Case In-sensitive
gatorsmile Jan 30, 2017
f9156d2
[SPARK-19406][SQL] Fix function to_json to respect user-provided options
gatorsmile Jan 31, 2017
be7425e
[SPARKR][DOCS] update R API doc for subset/extract
felixcheung Jan 31, 2017
26a4cba
[SPARK-19409][BUILD] Bump parquet version to 1.8.2
dongjoon-hyun Jan 31, 2017
3133f83
Merge remote-tracking branch 'apache-github/master' into mapWithState
tdas Jan 31, 2017
6fab7a5
Added docs
tdas Jan 31, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
21 changes: 20 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Imports from base R
# Do not include stats:: "rpois", "runif" - causes error at runtime
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
Expand Down Expand Up @@ -47,7 +64,8 @@ exportMethods("glm",
"spark.kstest",
"spark.logit",
"spark.randomForest",
"spark.gbt")
"spark.gbt",
"spark.bisectingKmeans")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down Expand Up @@ -94,6 +112,7 @@ exportMethods("arrange",
"freqItems",
"gapply",
"gapplyCollect",
"getNumPartitions",
"group_by",
"groupBy",
"head",
Expand Down
84 changes: 70 additions & 14 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,23 @@ getColumn <- function(x, c) {
column(callJMethod(x@sdf, "col", c))
}

setColumn <- function(x, c, value) {
if (class(value) != "Column" && !is.null(value)) {
if (isAtomicLengthOne(value)) {
value <- lit(value)
} else {
stop("value must be a Column, literal value as atomic in length of 1, or NULL")
}
}

if (is.null(value)) {
nx <- drop(x, c)
} else {
nx <- withColumn(x, c, value)
}
nx
}

#' @param name name of a Column (without being wrapped by \code{""}).
#' @rdname select
#' @name $
Expand All @@ -1735,19 +1752,7 @@ setMethod("$", signature(x = "SparkDataFrame"),
#' @note $<- since 1.4.0
setMethod("$<-", signature(x = "SparkDataFrame"),
function(x, name, value) {
if (class(value) != "Column" && !is.null(value)) {
if (isAtomicLengthOne(value)) {
value <- lit(value)
} else {
stop("value must be a Column, literal value as atomic in length of 1, or NULL")
}
}

if (is.null(value)) {
nx <- drop(x, name)
} else {
nx <- withColumn(x, name, value)
}
nx <- setColumn(x, name, value)
x@sdf <- nx@sdf
x
})
Expand All @@ -1767,6 +1772,21 @@ setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
getColumn(x, i)
})

#' @rdname subset
#' @name [[<-
#' @aliases [[<-,SparkDataFrame,numericOrcharacter-method
#' @note [[<- since 2.1.1
setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
function(x, i, value) {
if (is.numeric(i)) {
cols <- columns(x)
i <- cols[[i]]
}
nx <- setColumn(x, i, value)
x@sdf <- nx@sdf
x
})

#' @rdname subset
#' @name [
#' @aliases [,SparkDataFrame-method
Expand Down Expand Up @@ -1811,14 +1831,19 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' Return subsets of SparkDataFrame according to given conditions
#' @param x a SparkDataFrame.
#' @param i,subset (Optional) a logical expression to filter on rows.
#' For extract operator [[ and replacement operator [[<-, the indexing parameter for
#' a single Column.
#' @param j,select expression for the single Column or a list of columns to select from the SparkDataFrame.
#' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column.
#' Otherwise, a SparkDataFrame will always be returned.
#' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}.
#' If \code{NULL}, the specified Column is dropped.
#' @param ... currently not used.
#' @return A new SparkDataFrame containing only the rows that meet the condition with selected columns.
#' @export
#' @family SparkDataFrame functions
#' @aliases subset,SparkDataFrame-method
#' @seealso \link{withColumn}
#' @rdname subset
#' @name subset
#' @family subsetting functions
Expand All @@ -1836,6 +1861,10 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' subset(df, df$age %in% c(19, 30), 1:2)
#' subset(df, df$age %in% c(19), select = c(1,2))
#' subset(df, select = c(1,2))
#' # Columns can be selected and set
#' df[["age"]] <- 23
#' df[[1]] <- df$age
#' df[[2]] <- NULL # drop column
#' }
#' @note subset since 1.5.0
setMethod("subset", signature(x = "SparkDataFrame"),
Expand Down Expand Up @@ -1960,7 +1989,7 @@ setMethod("selectExpr",
#' @aliases withColumn,SparkDataFrame,character-method
#' @rdname withColumn
#' @name withColumn
#' @seealso \link{rename} \link{mutate}
#' @seealso \link{rename} \link{mutate} \link{subset}
#' @export
#' @examples
#'\dontrun{
Expand All @@ -1971,6 +2000,10 @@ setMethod("selectExpr",
#' # Replace an existing column
#' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
#' newDF3 <- withColumn(newDF, "newCol", 42)
#' # Use extract operator to set an existing or new column
#' df[["age"]] <- 23
#' df[[2]] <- df$col1
#' df[[2]] <- NULL # drop column
#' }
#' @note withColumn since 1.4.0
setMethod("withColumn",
Expand Down Expand Up @@ -3406,3 +3439,26 @@ setMethod("randomSplit",
}
sapply(sdfs, dataFrame)
})

#' getNumPartitions
#'
#' Return the number of partitions
#'
#' @param x A SparkDataFrame
#' @family SparkDataFrame functions
#' @aliases getNumPartitions,SparkDataFrame-method
#' @rdname getNumPartitions
#' @name getNumPartitions
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createDataFrame(cars, numPartitions = 2)
#' getNumPartitions(df)
#' }
#' @note getNumPartitions since 2.1.1
setMethod("getNumPartitions",
signature(x = "SparkDataFrame"),
function(x) {
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
})
30 changes: 15 additions & 15 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ setMethod("checkpoint",
#' @rdname getNumPartitions
#' @aliases getNumPartitions,RDD-method
#' @noRd
setMethod("getNumPartitions",
setMethod("getNumPartitionsRDD",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "getNumPartitions")
Expand All @@ -329,7 +329,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
.Deprecated("getNumPartitions")
getNumPartitions(x)
getNumPartitionsRDD(x)
})

#' Collect elements of an RDD
Expand Down Expand Up @@ -460,7 +460,7 @@ setMethod("countByValue",
signature(x = "RDD"),
function(x) {
ones <- lapply(x, function(item) { list(item, 1L) })
collectRDD(reduceByKey(ones, `+`, getNumPartitions(x)))
collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
})

#' Apply a function to all elements
Expand Down Expand Up @@ -780,7 +780,7 @@ setMethod("takeRDD",
resList <- list()
index <- -1
jrdd <- getJRDD(x)
numPartitions <- getNumPartitions(x)
numPartitions <- getNumPartitionsRDD(x)
serializedModeRDD <- getSerializedMode(x)

# TODO(shivaram): Collect more than one partition based on size
Expand Down Expand Up @@ -846,7 +846,7 @@ setMethod("firstRDD",
#' @noRd
setMethod("distinctRDD",
signature(x = "RDD"),
function(x, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
reduced <- reduceByKey(identical.mapped,
function(x, y) { x },
Expand Down Expand Up @@ -1053,7 +1053,7 @@ setMethod("coalesce",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(base::sample(numPartitions, 1) - 1)
Expand Down Expand Up @@ -1143,7 +1143,7 @@ setMethod("saveAsTextFile",
#' @noRd
setMethod("sortBy",
signature(x = "RDD", func = "function"),
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
values(sortByKey(keyBy(x, func), ascending, numPartitions))
})

Expand Down Expand Up @@ -1175,7 +1175,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
numPartitions <- getNumPartitions(newRdd)
numPartitions <- getNumPartitionsRDD(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)

while (TRUE) {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ setMethod("setName",
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
n <- getNumPartitions(x)
n <- getNumPartitionsRDD(x)

partitionFunc <- function(partIndex, part) {
mapply(
Expand Down Expand Up @@ -1450,7 +1450,7 @@ setMethod("zipWithUniqueId",
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
n <- getNumPartitions(x)
n <- getNumPartitionsRDD(x)
if (n > 1) {
nums <- collectRDD(lapplyPartition(x,
function(part) {
Expand Down Expand Up @@ -1566,8 +1566,8 @@ setMethod("unionRDD",
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
n1 <- getNumPartitions(x)
n2 <- getNumPartitions(other)
n1 <- getNumPartitionsRDD(x)
n2 <- getNumPartitionsRDD(other)
if (n1 != n2) {
stop("Can only zip RDDs which have the same number of partitions.")
}
Expand Down Expand Up @@ -1637,7 +1637,7 @@ setMethod("cartesian",
#' @noRd
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
Expand Down Expand Up @@ -1671,7 +1671,7 @@ setMethod("subtract",
#' @noRd
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })

Expand Down Expand Up @@ -1714,7 +1714,7 @@ setMethod("zipPartitions",
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
nPart <- sapply(rrdds, getNumPartitions)
nPart <- sapply(rrdds, getNumPartitionsRDD)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}
Expand Down
13 changes: 11 additions & 2 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
# @export
setGeneric("name", function(x) { standardGeneric("name") })

# @rdname getNumPartitions
# @rdname getNumPartitionsRDD
# @export
setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })
setGeneric("getNumPartitionsRDD", function(x) { standardGeneric("getNumPartitionsRDD") })

# @rdname getNumPartitions
# @export
Expand Down Expand Up @@ -492,6 +492,10 @@ setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })
#' @export
setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") })

# @rdname getNumPartitions
# @export
setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })

#' @rdname summary
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
Expand Down Expand Up @@ -1338,6 +1342,11 @@ setGeneric("rbind", signature = "...")
#' @export
setGeneric("spark.als", function(data, ...) { standardGeneric("spark.als") })

#' @rdname spark.bisectingKmeans
#' @export
setGeneric("spark.bisectingKmeans",
function(data, formula, ...) { standardGeneric("spark.bisectingKmeans") })

#' @rdname spark.gaussianMixture
#' @export
setGeneric("spark.gaussianMixture",
Expand Down
Loading