Skip to content

Commit

Permalink
Merge pull request #7 from mlverse/updates
Browse files Browse the repository at this point in the history
Updates
  • Loading branch information
edgararuiz authored Jul 31, 2023
2 parents 80bd2a2 + 8a70bdc commit a451faf
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 32 deletions.
5 changes: 3 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ Imports:
fs,
magrittr,
tidyr,
vctrs
vctrs,
processx
URL: https://github.com/mlverse/pysparklyr
BugReports: https://github.com/mlverse/pysparklyr/issues
Remotes:
Remotes:
sparklyr/sparklyr,
rstudio/reticulate
8 changes: 7 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ S3method(invoke,python.builtin.object)
S3method(invoke,spark_pyobj)
S3method(invoke_new,connect_spark)
S3method(pivot_longer,tbl_pyspark)
S3method(print,spark_pyobj)
S3method(same_src,pyspark_connection)
S3method(sample_frac,tbl_pyspark)
S3method(sample_n,tbl_pyspark)
Expand All @@ -26,7 +27,8 @@ S3method(spark_dataframe,tbl_pyspark)
S3method(spark_ide_columns,pyspark_connection)
S3method(spark_ide_objects,pyspark_connection)
S3method(spark_ide_preview,pyspark_connection)
S3method(spark_integ_test_skip,pyspark_connection)
S3method(spark_integ_test_skip,connect_databricks)
S3method(spark_integ_test_skip,connect_spark)
S3method(spark_log,pyspark_connection)
S3method(spark_read_csv,pyspark_connection)
S3method(spark_read_json,pyspark_connection)
Expand All @@ -47,6 +49,8 @@ S3method(tbl_ptype,tbl_pyspark)
S3method(tidyselect_data_has_predicates,tbl_pyspark)
export("%>%")
export(install_pyspark)
export(spark_connect_service_start)
export(spark_connect_service_stop)
import(DBI)
import(cli)
import(dbplyr)
Expand All @@ -70,6 +74,7 @@ importFrom(magrittr,"%>%")
importFrom(methods,is)
importFrom(methods,new)
importFrom(methods,setOldClass)
importFrom(processx,process)
importFrom(purrr,map)
importFrom(purrr,map_chr)
importFrom(purrr,map_lgl)
Expand Down Expand Up @@ -100,6 +105,7 @@ importFrom(sparklyr,spark_ide_columns)
importFrom(sparklyr,spark_ide_connection_updated)
importFrom(sparklyr,spark_ide_objects)
importFrom(sparklyr,spark_ide_preview)
importFrom(sparklyr,spark_install_find)
importFrom(sparklyr,spark_integ_test_skip)
importFrom(sparklyr,spark_log)
importFrom(sparklyr,spark_read_csv)
Expand Down
44 changes: 44 additions & 0 deletions R/connect-service.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#' Starts Spark Connect locally
#' @param version Spark version to use (3.4 or above)
#' @param scala_version Acceptable Scala version of packages to be loaded
#' @param ... Optional arguments; currently unused
#' @export
spark_connect_service_start <- function(version = "3.4",
scala_version = "2.12",
...
) {
get_version <- spark_install_find(version = version)
cmd <- path(get_version$sparkVersionDir, "sbin", "start-connect-server.sh")
args <- c(
"--packages",
glue("org.apache.spark:spark-connect_{scala_version}:{get_version$sparkVersion}")
)
prs <- process$new(
command = cmd,
args = args,
stdout = "|",
stderr = "|",
stdin = "|"
)
cli_text("-- Starting Spark Connect locally ...")
output <- prs$read_all_output()
cli_text(output)
invisible()
}

#' @rdname spark_connect_service_start
#' @export
spark_connect_service_stop <- function(version = "3.4",
...
) {
get_version <- spark_install_find(version = version)
cmd <- path(get_version$sparkVersionDir, "sbin", "stop-connect-server.sh")
cli_text("-- Stopping Spark Connect")
prs <- process$new(
command = cmd,
stdout = "|",
stderr = "|",
stdin = "|"
)

}
52 changes: 31 additions & 21 deletions R/integ-test.R
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
#' @export
spark_integ_test_skip.pyspark_connection <- function(sc, test_name) {
spark_integ_test_skip.connect_databricks <- function(sc, test_name) {
spark_integ_generic(test_name)
}

supports <- function(x, test, default_out = FALSE) {
if (grepl(test, test_name)) {
x <- default_out
}
x
}
#' @export
spark_integ_test_skip.connect_spark <- function(sc, test_name) {
spark_integ_generic(test_name) %>%
supports("format-csv", test_name) %>%
supports("format-parquet", test_name) %>%
supports("format-orc", test_name) %>%
supports("format-json", test_name) %>%
supports("format-text", test_name)
}


spark_integ_generic <- function(test_name) {
out <- TRUE

out %>%
supports("dplyr") %>%
supports("dplyr-do", TRUE) %>%
supports("dplyr-hof", TRUE) %>%
supports("dplyr-cumprod", TRUE) %>%
supports("DBI") %>%
supports("format-csv") %>%
supports("format-parquet") %>%
supports("format-orc") %>%
supports("format-json") %>%
supports("format-text") %>%
supports("pivot-longer") %>%
supports("pivot-longer-names-repair", TRUE) %>%
supports("pivot-longer-values-transform", TRUE) %>%
supports("ml-", TRUE)
supports("dplyr", test_name) %>%
supports("dplyr-do", test_name, TRUE) %>%
supports("dplyr-hof", test_name, TRUE) %>%
supports("dplyr-cumprod", test_name, TRUE) %>%
supports("DBI", test_name) %>%
supports("pivot-longer", test_name) %>%
supports("pivot-longer-names-repair", test_name, TRUE) %>%
supports("pivot-longer-values-transform", test_name, TRUE) %>%
supports("ml-", test_name, TRUE) %>%
supports("sdf-distinct", test_name)
}

supports <- function(x, test, test_name, default_out = FALSE) {
if (grepl(test, test_name)) {
x <- default_out
}
x
}
2 changes: 2 additions & 0 deletions R/package.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#' @importFrom sparklyr spark_read_json spark_read_orc
#' @importFrom sparklyr spark_write_csv spark_write_parquet spark_write_text
#' @importFrom sparklyr spark_write_orc spark_write_json
#' @importFrom sparklyr spark_install_find
#' @importFrom tidyselect tidyselect_data_has_predicates
#' @importFrom dplyr tbl collect tibble same_src compute as_tibble group_vars
#' @importFrom dplyr sample_n sample_frac slice_sample select tbl_ptype group_by
Expand All @@ -19,6 +20,7 @@
#' @importFrom utils head type.convert
#' @importFrom tidyr pivot_longer
#' @importFrom vctrs vec_as_names
#' @importFrom processx process
#' @import reticulate
#' @import dbplyr
#' @import glue
Expand Down
15 changes: 12 additions & 3 deletions R/pyspark-connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,21 @@ invoke.python.builtin.object <- function(jobj, method, ...) {

invoke_conn <- function(jobj, context, method, ...) {
x <- py_get_attr(context, method)
out <- NULL
if (inherits(x, "python.builtin.method")) {
run_x <- py_call(x, ...)
out <- as_spark_pyobj(run_x, jobj)
} else {
out <- py_to_r(x)

if(inherits(run_x, "numpy.number")) {
out <- py_to_r(run_x)
}

if(is.null(out)) {
out <- as_spark_pyobj(run_x, jobj)
}
}

if (is.null(out)) out <- py_to_r(x)

out
}

Expand Down
5 changes: 5 additions & 0 deletions R/spark-pyobj.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#' @export
print.spark_pyobj <- function(x, ...) {
print(x$pyspark_obj)
}

#' @export
sdf_read_column.spark_pyjobj <- function(x, column) {
sdf <- spark_dataframe(x)
Expand Down
21 changes: 21 additions & 0 deletions man/spark_connect_service_start.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions progress.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

- [x] Initial connection routines
- [x] Spark Connect connectivity
- [ ] Initiate Spark Connect when creating new connection, similar to "local" (Maybe)
- [x] Initiate Spark Connect when creating new connection, similar to "local"
- [ ] Fail when user changes from one connection to another in the same R/Python session

### RStudio Integration
Expand Down Expand Up @@ -92,11 +92,10 @@

### SDF

- [ ] First successful run of an `sdf_` functions
- [ ] Run all `sdf_` functions, and have all/most pass tests
- [ ] Determine what to do with functions that will not run
- Individual functions:
- Individual functions - If not listed here, assume it is not supported:
- [ ] `sdf_broadcast()` **Blocked** Needs SparkContext to work
- [x] `sdf_dim()`
- [x] `sdf_distinct()`

### Data

Expand Down

0 comments on commit a451faf

Please sign in to comment.