Skip to content

Commit 52356b6

Browse files
committed
Merge pull request apache#139 from shivaram/fix-backend-exit
[SPARKR-185] Fix SparkR backend to exit in more cases
2 parents a9f8e8e + 125ae43 commit 52356b6

File tree

2 files changed

+39
-16
lines changed

2 files changed

+39
-16
lines changed

pkg/R/sparkR.R

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,42 @@ sparkR.onLoad <- function(libname, pkgname) {
1111
.sparkREnv$assemblyJarPath <- assemblyJarPath
1212
}
1313

14-
#' Stop this Spark context
15-
#' Also terminates the backend this R session is connected to
16-
sparkR.stop <- function(sparkREnv) {
17-
if (exists(".sparkRjsc", envir = .sparkREnv)) {
18-
sc <- get(".sparkRjsc", envir = .sparkREnv)
19-
callJMethod(sc, "stop")
14+
# Utility function that returns TRUE if we have an active connection to the
15+
# backend and FALSE otherwise
16+
connExists <- function(env) {
17+
tryCatch({
18+
exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
19+
}, error = function(err) {
20+
return(FALSE)
21+
})
22+
}
23+
24+
# Stop the Spark context.
25+
# Also terminates the backend this R session is connected to
26+
sparkR.stop <- function(env) {
27+
cat("Stopping SparkR\n")
28+
29+
if (!connExists(env)) {
30+
# When the workspace is saved in R, the connections are closed
31+
# *before* the finalizer is run. In these cases, we reconnect
32+
# to the backend, so we can shut it down.
33+
connectBackend("localhost", .sparkREnv$sparkRBackendPort)
2034
}
2135

22-
if (exists(".sparkRCon", envir = .sparkREnv)) {
23-
callJStatic("SparkRHandler", "stopBackend")
24-
# Also close the connection and remove it from our env
25-
conn <- get(".sparkRCon", .sparkREnv)
26-
close(conn)
27-
rm(".sparkRCon", envir = .sparkREnv)
36+
if (exists(".sparkRjsc", envir = env)) {
37+
sc <- get(".sparkRjsc", envir = env)
38+
callJMethod(sc, "stop")
2839
}
40+
41+
callJStatic("SparkRHandler", "stopBackend")
42+
# Also close the connection and remove it from our env
43+
conn <- get(".sparkRCon", env)
44+
close(conn)
45+
rm(".sparkRCon", envir = env)
46+
47+
# Finally, sleep for 1 sec to let backend finish exiting.
48+
# Without this we get port conflicts in RStudio when we try to 'Restart R'.
49+
Sys.sleep(1)
2950
}
3051

3152
#' Initialize a new Spark Context.
@@ -79,7 +100,8 @@ sparkR.init <- function(
79100
args = as.character(sparkRBackendPort),
80101
javaOpts = paste("-Xmx", sparkMem, sep = ""))
81102
Sys.sleep(2) # Wait for backend to come up
82-
connectBackend("localhost", 12345) # Connect to it
103+
.sparkREnv$sparkRBackendPort <- sparkRBackendPort
104+
connectBackend("localhost", sparkRBackendPort) # Connect to it
83105

84106
if (nchar(sparkHome) != 0) {
85107
sparkHome <- normalizePath(sparkHome)

pkg/R/sparkRBackend.R

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ isRemoveMethod <- function(isStatic, objId, methodName) {
4747
# methodName - name of method to be invoked
4848
invokeJava <- function(isStatic, objId, methodName, ...) {
4949
if (!exists(".sparkRCon", .sparkREnv)) {
50-
stop("No connection to backend found")
50+
stop("No connection to backend found. Please re-run sparkR.init")
5151
}
5252

5353
# If this isn't a removeJObject call
@@ -62,6 +62,7 @@ invokeJava <- function(isStatic, objId, methodName, ...) {
6262
}
6363
}
6464

65+
6566
rc <- rawConnection(raw(0), "r+")
6667

6768
writeBoolean(rc, isStatic)
@@ -78,12 +79,12 @@ invokeJava <- function(isStatic, objId, methodName, ...) {
7879
writeInt(conn, length(bytesToSend))
7980
writeBin(bytesToSend, conn)
8081

82+
close(rc) # TODO: Can we close this before ?
83+
8184
# TODO: check the status code to output error information
8285
returnStatus <- readInt(conn)
8386
stopifnot(returnStatus == 0)
8487
ret <- readObject(conn)
8588

86-
close(rc) # TODO: Can we close this before ?
87-
8889
ret
8990
}

0 commit comments

Comments
 (0)