Skip to content

Commit 35ae687

Browse files
author
Agarwal
committed
LIVY-246 Support multiple Spark home in runtime
1 parent 7212e3f commit 35ae687

File tree

4 files changed

+18
-6
lines changed

4 files changed

+18
-6
lines changed

server/src/main/scala/com/cloudera/livy/LivyConf.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,14 +246,20 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
246246
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)
247247

248248
/** Return the location of the spark home directory */
249-
def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))
249+
def sparkHome(version: Option[String] = None): Option[String] = {
250+
version.map {version => Option(get(s"livy.server.spark-home_$version"))
251+
.orElse(throw new IllegalArgumentException(
252+
s"Spark version: $version is not supported"))
253+
}.getOrElse(Option(get(s"livy.server.spark-home")).orElse(sys.env.get("SPARK_HOME")))
254+
}
250255

251256
/** Return the spark master Livy sessions should use. */
252257
def sparkMaster(): String = get(LIVY_SPARK_MASTER)
253258

254259
/** Return the path to the spark-submit executable. */
255-
def sparkSubmit(): String = {
256-
sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
260+
def sparkSubmit(version: Option[String] = None): String = {
261+
sparkHome(version).map { _ + File.separator + "bin" + File.separator +
262+
"spark-submit" }.get
257263
}
258264

259265
/** Return the list of superusers. */

server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.cloudera.livy.server.batch
2020

21+
import java.io.File
2122
import java.lang.ProcessBuilder.Redirect
2223

2324
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
@@ -62,7 +63,11 @@ object BatchSession {
6263
request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
6364
require(request.file != null, "File is required.")
6465

65-
val builder = new SparkProcessBuilder(livyConf)
66+
val builder = new SparkProcessBuilder(livyConf, request.sparkVersion)
67+
request.sparkVersion.map({ value =>
68+
builder.env("SPARK_CONF_DIR", livyConf.sparkHome(request.sparkVersion)
69+
+ File.separator + "conf")
70+
})
6671
builder.conf(conf)
6772

6873
proxyUser.foreach(builder.proxyUser)

server/src/main/scala/com/cloudera/livy/server/batch/CreateBatchRequest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ class CreateBatchRequest {
3636
var queue: Option[String] = None
3737
var name: Option[String] = None
3838
var conf: Map[String, String] = Map()
39+
var sparkVersion: Option[String] = None
3940

4041
}

server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import scala.collection.mutable.ArrayBuffer
2525
import com.cloudera.livy.{LivyConf, Logging}
2626
import com.cloudera.livy.util.LineBufferedProcess
2727

28-
class SparkProcessBuilder(livyConf: LivyConf) extends Logging {
28+
class SparkProcessBuilder(livyConf: LivyConf, version: Option[String]) extends Logging {
2929

30-
private[this] var _executable: String = livyConf.sparkSubmit()
30+
private[this] var _executable: String = livyConf.sparkSubmit(version)
3131
private[this] var _master: Option[String] = None
3232
private[this] var _deployMode: Option[String] = None
3333
private[this] var _className: Option[String] = None

0 commit comments

Comments
 (0)