Skip to content

Commit

Permalink
[SPARK-43789][R] Uses 'spark.sql.execution.arrow.maxRecordsPerBatch' …
Browse files Browse the repository at this point in the history
…in R createDataFrame with Arrow by default

### What changes were proposed in this pull request?

This PR proposes to pick a proper number of partitions when create a DataFrame from R DataFrame with Arrow.
Previously, the number of partitions was always `1` if not specified.
Now, it splits the input R DataFrame by `spark.sql.execution.arrow.maxRecordsPerBatch`, and pick a proper number of partitions (the number of batches).

This is matched with PySpark code path:
https://github.com/apache/spark/blob/46949e692e863992f4c50bdd482d5216d4fd9221/python/pyspark/sql/pandas/conversion.py#L618C11-L626

### Why are the changes needed?

To avoid having OOM when the R DataFrame is too large, and enables a proper distributed computing.

### Does this PR introduce _any_ user-facing change?

Yes, it changes the default partition number when users call `createDataFrame` with R DataFrame when Arrow optimization is enabled.
The concept of the partition is subject to be internal, and by default it doesn't change its behaviour.

### How was this patch tested?

Manually tested with a large CSV file (3 GB).
Also added a unittest.

Closes apache#41307 from HyukjinKwon/default-batch-size.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed May 25, 2023
1 parent 6fc5f4e commit eca0bef
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
4 changes: 3 additions & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
numPartitions <- if (!is.null(numPartitions)) {
numToInt(numPartitions)
} else {
1
# If numPartitions is not set, chunk the R DataFrame based on the batch size.
ceiling(
nrow(rdf) / as.numeric(sparkR.conf("spark.sql.execution.arrow.maxRecordsPerBatch")[[1]]))
}

rdf_slices <- if (numPartitions > 1) {
Expand Down
15 changes: 15 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL_arrow.R
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,19 @@ test_that("SPARK-32478: gapply() Arrow optimization - error message for schema m
"expected IntegerType, IntegerType, got IntegerType, StringType")
})

test_that("SPARK-43789: Automatically pick the number of partitions based on Arrow batch size", {
skip_if_not_installed("arrow")

conf <- callJMethod(sparkSession, "conf")
maxRecordsPerBatch <- sparkR.conf("spark.sql.execution.arrow.maxRecordsPerBatch")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.maxRecordsPerBatch", "10")
tryCatch({
expect_equal(getNumPartitionsRDD(toRDD(createDataFrame(mtcars))), 4)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.maxRecordsPerBatch", maxRecordsPerBatch)
})
})

sparkR.session.stop()

0 comments on commit eca0bef

Please sign in to comment.