Skip to content

Commit 43e61ef

Browse files
committed
Merge branch 'master' into SPARK-24063
2 parents 96152da + 95bb012 commit 43e61ef

File tree

616 files changed

+13566
-5558
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

616 files changed

+13566
-5558
lines changed

R/pkg/DESCRIPTION

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
Package: SparkR
22
Type: Package
33
Version: 3.0.0
4-
Title: R Frontend for Apache Spark
5-
Description: Provides an R Frontend for Apache Spark.
4+
Title: R Front end for 'Apache Spark'
5+
Description: Provides an R Front end for 'Apache Spark' <https://spark.apache.org>.
66
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
77
email = "shivaram@cs.berkeley.edu"),
88
person("Xiangrui", "Meng", role = "aut",
@@ -11,8 +11,8 @@ Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
1111
email = "felixcheung@apache.org"),
1212
person(family = "The Apache Software Foundation", role = c("aut", "cph")))
1313
License: Apache License (== 2.0)
14-
URL: http://www.apache.org/ http://spark.apache.org/
15-
BugReports: http://spark.apache.org/contributing.html
14+
URL: https://www.apache.org/ https://spark.apache.org/
15+
BugReports: https://spark.apache.org/contributing.html
1616
SystemRequirements: Java (== 8)
1717
Depends:
1818
R (>= 3.1),

R/pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,10 @@ exportMethods("%<=>%",
312312
"lower",
313313
"lpad",
314314
"ltrim",
315+
"map_concat",
315316
"map_entries",
316317
"map_from_arrays",
318+
"map_from_entries",
317319
"map_keys",
318320
"map_values",
319321
"max",

R/pkg/R/DataFrame.R

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,11 +1177,67 @@ setMethod("dim",
11771177
setMethod("collect",
11781178
signature(x = "SparkDataFrame"),
11791179
function(x, stringsAsFactors = FALSE) {
1180+
connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
1181+
useArrow <- FALSE
1182+
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
1183+
if (arrowEnabled) {
1184+
useArrow <- tryCatch({
1185+
requireNamespace1 <- requireNamespace
1186+
if (!requireNamespace1("arrow", quietly = TRUE)) {
1187+
stop("'arrow' package should be installed.")
1188+
}
1189+
# Currenty Arrow optimization does not support raw for now.
1190+
# Also, it does not support explicit float type set by users.
1191+
if (inherits(schema(x), "structType")) {
1192+
if (any(sapply(schema(x)$fields(),
1193+
function(x) x$dataType.toString() == "FloatType"))) {
1194+
stop(paste0("Arrow optimization in the conversion from Spark DataFrame to R ",
1195+
"DataFrame does not support FloatType yet."))
1196+
}
1197+
if (any(sapply(schema(x)$fields(),
1198+
function(x) x$dataType.toString() == "BinaryType"))) {
1199+
stop(paste0("Arrow optimization in the conversion from Spark DataFrame to R ",
1200+
"DataFrame does not support BinaryType yet."))
1201+
}
1202+
}
1203+
TRUE
1204+
}, error = function(e) {
1205+
warning(paste0("The conversion from Spark DataFrame to R DataFrame was attempted ",
1206+
"with Arrow optimization because ",
1207+
"'spark.sql.execution.arrow.enabled' is set to true; however, ",
1208+
"failed, attempting non-optimization. Reason: ",
1209+
e))
1210+
FALSE
1211+
})
1212+
}
1213+
11801214
dtypes <- dtypes(x)
11811215
ncol <- length(dtypes)
11821216
if (ncol <= 0) {
11831217
# empty data.frame with 0 columns and 0 rows
11841218
data.frame()
1219+
} else if (useArrow) {
1220+
requireNamespace1 <- requireNamespace
1221+
if (requireNamespace1("arrow", quietly = TRUE)) {
1222+
read_arrow <- get("read_arrow", envir = asNamespace("arrow"), inherits = FALSE)
1223+
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
1224+
1225+
portAuth <- callJMethod(x@sdf, "collectAsArrowToR")
1226+
port <- portAuth[[1]]
1227+
authSecret <- portAuth[[2]]
1228+
conn <- socketConnection(
1229+
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
1230+
output <- tryCatch({
1231+
doServerAuth(conn, authSecret)
1232+
arrowTable <- read_arrow(readRaw(conn))
1233+
as.data.frame(as_tibble(arrowTable), stringsAsFactors = stringsAsFactors)
1234+
}, finally = {
1235+
close(conn)
1236+
})
1237+
return(output)
1238+
} else {
1239+
stop("'arrow' package should be installed.")
1240+
}
11851241
} else {
11861242
# listCols is a list of columns
11871243
listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)

R/pkg/R/SQLContext.R

Lines changed: 136 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,70 @@ getDefaultSqlSource <- function() {
147147
l[["spark.sql.sources.default"]]
148148
}
149149

150+
writeToFileInArrow <- function(fileName, rdf, numPartitions) {
151+
requireNamespace1 <- requireNamespace
152+
153+
# R API in Arrow is not yet released in CRAN. CRAN requires to add the
154+
# package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available
155+
# or not. Therefore, it works around by avoiding direct requireNamespace.
156+
# Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204.
157+
if (requireNamespace1("arrow", quietly = TRUE)) {
158+
record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE)
159+
RecordBatchStreamWriter <- get(
160+
"RecordBatchStreamWriter", envir = asNamespace("arrow"), inherits = FALSE)
161+
FileOutputStream <- get(
162+
"FileOutputStream", envir = asNamespace("arrow"), inherits = FALSE)
163+
164+
numPartitions <- if (!is.null(numPartitions)) {
165+
numToInt(numPartitions)
166+
} else {
167+
1
168+
}
169+
170+
rdf_slices <- if (numPartitions > 1) {
171+
split(rdf, makeSplits(numPartitions, nrow(rdf)))
172+
} else {
173+
list(rdf)
174+
}
175+
176+
stream_writer <- NULL
177+
tryCatch({
178+
for (rdf_slice in rdf_slices) {
179+
batch <- record_batch(rdf_slice)
180+
if (is.null(stream_writer)) {
181+
stream <- FileOutputStream(fileName)
182+
schema <- batch$schema
183+
stream_writer <- RecordBatchStreamWriter(stream, schema)
184+
}
185+
186+
stream_writer$write_batch(batch)
187+
}
188+
},
189+
finally = {
190+
if (!is.null(stream_writer)) {
191+
stream_writer$close()
192+
}
193+
})
194+
195+
} else {
196+
stop("'arrow' package should be installed.")
197+
}
198+
}
199+
200+
checkTypeRequirementForArrow <- function(dataHead, schema) {
201+
# Currenty Arrow optimization does not support raw for now.
202+
# Also, it does not support explicit float type set by users. It leads to
203+
# incorrect conversion. We will fall back to the path without Arrow optimization.
204+
if (any(sapply(dataHead, is.raw))) {
205+
stop("Arrow optimization with R DataFrame does not support raw type yet.")
206+
}
207+
if (inherits(schema, "structType")) {
208+
if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
209+
stop("Arrow optimization with R DataFrame does not support FloatType type yet.")
210+
}
211+
}
212+
}
213+
150214
#' Create a SparkDataFrame
151215
#'
152216
#' Converts R data.frame or list into SparkDataFrame.
@@ -172,36 +236,76 @@ getDefaultSqlSource <- function() {
172236
createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
173237
numPartitions = NULL) {
174238
sparkSession <- getSparkSession()
239+
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
240+
useArrow <- FALSE
241+
firstRow <- NULL
175242

176243
if (is.data.frame(data)) {
177-
# Convert data into a list of rows. Each row is a list.
178-
179-
# get the names of columns, they will be put into RDD
180-
if (is.null(schema)) {
181-
schema <- names(data)
182-
}
244+
# get the names of columns, they will be put into RDD
245+
if (is.null(schema)) {
246+
schema <- names(data)
247+
}
183248

184-
# get rid of factor type
185-
cleanCols <- function(x) {
186-
if (is.factor(x)) {
187-
as.character(x)
188-
} else {
189-
x
190-
}
249+
# get rid of factor type
250+
cleanCols <- function(x) {
251+
if (is.factor(x)) {
252+
as.character(x)
253+
} else {
254+
x
191255
}
256+
}
257+
data[] <- lapply(data, cleanCols)
258+
259+
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
260+
if (arrowEnabled) {
261+
useArrow <- tryCatch({
262+
stopifnot(length(data) > 0)
263+
dataHead <- head(data, 1)
264+
checkTypeRequirementForArrow(data, schema)
265+
fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp")
266+
tryCatch({
267+
writeToFileInArrow(fileName, data, numPartitions)
268+
jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
269+
"readArrowStreamFromFile",
270+
sparkSession,
271+
fileName)
272+
},
273+
finally = {
274+
# File might not be created.
275+
suppressWarnings(file.remove(fileName))
276+
})
277+
278+
firstRow <- do.call(mapply, append(args, dataHead))[[1]]
279+
TRUE
280+
},
281+
error = function(e) {
282+
warning(paste0("createDataFrame attempted Arrow optimization because ",
283+
"'spark.sql.execution.arrow.enabled' is set to true; however, ",
284+
"failed, attempting non-optimization. Reason: ",
285+
e))
286+
FALSE
287+
})
288+
}
192289

290+
if (!useArrow) {
291+
# Convert data into a list of rows. Each row is a list.
193292
# drop factors and wrap lists
194-
data <- setNames(lapply(data, cleanCols), NULL)
293+
data <- setNames(as.list(data), NULL)
195294

196295
# check if all columns have supported type
197296
lapply(data, getInternalType)
198297

199298
# convert to rows
200-
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
201299
data <- do.call(mapply, append(args, data))
300+
if (length(data) > 0) {
301+
firstRow <- data[[1]]
302+
}
303+
}
202304
}
203305

204-
if (is.list(data)) {
306+
if (useArrow) {
307+
rdd <- jrddInArrow
308+
} else if (is.list(data)) {
205309
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
206310
if (!is.null(numPartitions)) {
207311
rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions))
@@ -215,14 +319,16 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
215319
}
216320

217321
if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
218-
row <- firstRDD(rdd)
322+
if (is.null(firstRow)) {
323+
firstRow <- firstRDD(rdd)
324+
}
219325
names <- if (is.null(schema)) {
220-
names(row)
326+
names(firstRow)
221327
} else {
222328
as.list(schema)
223329
}
224330
if (is.null(names)) {
225-
names <- lapply(1:length(row), function(x) {
331+
names <- lapply(1:length(firstRow), function(x) {
226332
paste("_", as.character(x), sep = "")
227333
})
228334
}
@@ -237,19 +343,24 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
237343
nn
238344
})
239345

240-
types <- lapply(row, infer_type)
241-
fields <- lapply(1:length(row), function(i) {
346+
types <- lapply(firstRow, infer_type)
347+
fields <- lapply(1:length(firstRow), function(i) {
242348
structField(names[[i]], types[[i]], TRUE)
243349
})
244350
schema <- do.call(structType, fields)
245351
}
246352

247353
stopifnot(class(schema) == "structType")
248354

249-
jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
250-
srdd <- callJMethod(jrdd, "rdd")
251-
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
252-
srdd, schema$jobj, sparkSession)
355+
if (useArrow) {
356+
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
357+
"toDataFrame", rdd, schema$jobj, sparkSession)
358+
} else {
359+
jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
360+
srdd <- callJMethod(jrdd, "rdd")
361+
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
362+
srdd, schema$jobj, sparkSession)
363+
}
253364
dataFrame(sdf)
254365
}
255366

R/pkg/R/context.R

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,26 @@ objectFile <- function(sc, path, minPartitions = NULL) {
8181
RDD(jrdd, "byte")
8282
}
8383

84+
makeSplits <- function(numSerializedSlices, length) {
85+
# Generate the slice ids to put each row
86+
# For instance, for numSerializedSlices of 22, length of 50
87+
# [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22
88+
# [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47
89+
# Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced.
90+
# We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD
91+
if (numSerializedSlices > 0) {
92+
unlist(lapply(0: (numSerializedSlices - 1), function(x) {
93+
# nolint start
94+
start <- trunc((as.numeric(x) * length) / numSerializedSlices)
95+
end <- trunc(((as.numeric(x) + 1) * length) / numSerializedSlices)
96+
# nolint end
97+
rep(start, end - start)
98+
}))
99+
} else {
100+
1
101+
}
102+
}
103+
84104
#' Create an RDD from a homogeneous list or vector.
85105
#'
86106
#' This function creates an RDD from a local homogeneous list in R. The elements
@@ -143,25 +163,7 @@ parallelize <- function(sc, coll, numSlices = 1) {
143163
# For large objects we make sure the size of each slice is also smaller than sizeLimit
144164
numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit)))
145165

146-
# Generate the slice ids to put each row
147-
# For instance, for numSerializedSlices of 22, length of 50
148-
# [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22
149-
# [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47
150-
# Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced.
151-
# We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD
152-
splits <- if (numSerializedSlices > 0) {
153-
unlist(lapply(0: (numSerializedSlices - 1), function(x) {
154-
# nolint start
155-
start <- trunc((as.numeric(x) * len) / numSerializedSlices)
156-
end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices)
157-
# nolint end
158-
rep(start, end - start)
159-
}))
160-
} else {
161-
1
162-
}
163-
164-
slices <- split(coll, splits)
166+
slices <- split(coll, makeSplits(numSerializedSlices, len))
165167

166168
# Serialize each slice: obtain a list of raws, or a list of lists (slices) of
167169
# 2-tuples of raws

0 commit comments

Comments
 (0)