Skip to content

[SPARK-17838][SparkR] Check named arguments for options and use formatted R friendly message from JVM exception message #15608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ setMethod("write.json",
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "json", path))
invisible(handledCallJMethod(write, "json", path))
})

#' Save the contents of SparkDataFrame as an ORC file, preserving the schema.
Expand Down Expand Up @@ -792,7 +792,7 @@ setMethod("write.orc",
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "orc", path))
invisible(handledCallJMethod(write, "orc", path))
})

#' Save the contents of SparkDataFrame as a Parquet file, preserving the schema.
Expand Down Expand Up @@ -824,7 +824,7 @@ setMethod("write.parquet",
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "parquet", path))
invisible(handledCallJMethod(write, "parquet", path))
})

#' @rdname write.parquet
Expand Down Expand Up @@ -868,7 +868,7 @@ setMethod("write.text",
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "text", path))
invisible(handledCallJMethod(write, "text", path))
})

#' Distinct
Expand Down Expand Up @@ -3315,7 +3315,7 @@ setMethod("write.jdbc",
jprops <- varargsToJProperties(...)
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
invisible(callJMethod(write, "jdbc", url, tableName, jprops))
invisible(handledCallJMethod(write, "jdbc", url, tableName, jprops))
})

#' randomSplit
Expand Down
17 changes: 9 additions & 8 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ read.json.default <- function(path, ...) {
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "json", paths)
sdf <- handledCallJMethod(read, "json", paths)
dataFrame(sdf)
}

Expand Down Expand Up @@ -421,7 +421,7 @@ read.orc <- function(path, ...) {
path <- suppressWarnings(normalizePath(path))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "orc", path)
sdf <- handledCallJMethod(read, "orc", path)
dataFrame(sdf)
}

Expand All @@ -443,7 +443,7 @@ read.parquet.default <- function(path, ...) {
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "parquet", paths)
sdf <- handledCallJMethod(read, "parquet", paths)
dataFrame(sdf)
}

Expand Down Expand Up @@ -495,7 +495,7 @@ read.text.default <- function(path, ...) {
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "text", paths)
sdf <- handledCallJMethod(read, "text", paths)
dataFrame(sdf)
}

Expand Down Expand Up @@ -913,12 +913,13 @@ read.jdbc <- function(url, tableName,
} else {
numPartitions <- numToInt(numPartitions)
}
sdf <- callJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
} else if (length(predicates) > 0) {
sdf <- callJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)), jprops)
sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)),
jprops)
} else {
sdf <- callJMethod(read, "jdbc", url, tableName, jprops)
sdf <- handledCallJMethod(read, "jdbc", url, tableName, jprops)
}
dataFrame(sdf)
}
44 changes: 32 additions & 12 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -338,21 +338,41 @@ varargsToEnv <- function(...) {
# into string.
varargsToStrEnv <- function(...) {
pairs <- list(...)
nameList <- names(pairs)
env <- new.env()
for (name in names(pairs)) {
value <- pairs[[name]]
if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
stop(paste0("Unsupported type for ", name, " : ", class(value),
". Supported types are logical, numeric, character and NULL."))
}
if (is.logical(value)) {
env[[name]] <- tolower(as.character(value))
} else if (is.null(value)) {
env[[name]] <- value
} else {
env[[name]] <- as.character(value)
ignoredNames <- list()

if (is.null(nameList)) {
# When all arguments are not named, names(..) returns NULL.
ignoredNames <- pairs
} else {
for (i in seq_along(pairs)) {
name <- nameList[i]
value <- pairs[i]
if (identical(name, "")) {
# When some of arguments are not named, name is "".
ignoredNames <- append(ignoredNames, value)
} else {
value <- pairs[[name]]
if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
stop(paste0("Unsupported type for ", name, " : ", class(value),
". Supported types are logical, numeric, character and NULL."), call. = FALSE)
}
if (is.logical(value)) {
env[[name]] <- tolower(as.character(value))
} else if (is.null(value)) {
env[[name]] <- value
} else {
env[[name]] <- as.character(value)
}
}
}
}

if (length(ignoredNames) != 0) {
warning(paste0("Unnamed arguments ignored: ", paste(ignoredNames, collapse = ", "), "."),
call. = FALSE)
}
env
}

Expand Down
16 changes: 16 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2657,6 +2657,14 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
# DataFrameWriter.save() without path.
expect_error(write.df(df, source = "csv"),
"Error in save : illegal argument - 'path' is not specified")
expect_error(write.json(df, jsonPath),
"Error in json : analysis error - path file:.*already exists")
expect_error(write.text(df, jsonPath),
"Error in text : analysis error - path file:.*already exists")
expect_error(write.orc(df, jsonPath),
"Error in orc : analysis error - path file:.*already exists")
expect_error(write.parquet(df, jsonPath),
"Error in parquet : analysis error - path file:.*already exists")

# Arguments checking in R side.
expect_error(write.df(df, "data.tmp", source = c(1, 2)),
Expand All @@ -2676,13 +2684,21 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume
paste("Error in loadDF : analysis error - Unable to infer schema for JSON at .",
"It must be specified manually"))
expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist")
expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist")
expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist")
expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist")
expect_error(read.parquet("arbitrary_path"),
"Error in parquet : analysis error - Path does not exist")

# Arguments checking in R side.
expect_error(read.df(path = c(3)),
"path should be charactor, NULL or omitted.")
expect_error(read.df(jsonPath, source = c(1, 2)),
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))

expect_warning(read.json(jsonPath, a = 1, 2, 3, "a"),
"Unnamed arguments ignored: 2, 3, a.")
})

unlink(parquetPath)
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/inst/tests/testthat/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ test_that("varargsToStrEnv", {
expect_error(varargsToStrEnv(a = list(1, "a")),
paste0("Unsupported type for a : list. Supported types are logical, ",
"numeric, character and NULL."))
expect_warning(varargsToStrEnv(a = 1, 2, 3, 4), "Unnamed arguments ignored: 2, 3, 4.")
expect_warning(varargsToStrEnv(1, 2, 3, 4), "Unnamed arguments ignored: 1, 2, 3, 4.")
})

sparkR.session.stop()