From 5bd379ddf99cdbb78b7ce6ba70241d3d887cec8c Mon Sep 17 00:00:00 2001 From: weiwenda Date: Tue, 15 Jan 2019 10:56:56 +0800 Subject: [PATCH] Only SQL Kind Statement run in ThreadPool --- bin/livy-server | 2 +- conf/livy-fairscheduler.xml | 31 +++++++++++++++++++ .../scala/org/apache/livy/sessions/Kind.scala | 3 ++ .../org/apache/livy/repl/SQLInterpreter.scala | 5 ++- .../scala/org/apache/livy/repl/Session.scala | 19 +++++++++--- .../org/apache/livy/rsc/ContextLauncher.java | 7 +++++ .../java/org/apache/livy/rsc/RSCConf.java | 3 +- 7 files changed, 63 insertions(+), 7 deletions(-) create mode 100644 conf/livy-fairscheduler.xml diff --git a/bin/livy-server b/bin/livy-server index 8d27d4e61..70643eb8d 100755 --- a/bin/livy-server +++ b/bin/livy-server @@ -21,7 +21,7 @@ usage="Usage: livy-server (start|stop|status)" export LIVY_HOME=$(cd $(dirname $0)/.. && pwd) -LIVY_CONF_DIR=${LIVY_CONF_DIR:-"$LIVY_HOME/conf"} +export LIVY_CONF_DIR=${LIVY_CONF_DIR:-"$LIVY_HOME/conf"} if [ -f "${LIVY_CONF_DIR}/livy-env.sh" ]; then # Promote all variable declarations to environment (exported) variables diff --git a/conf/livy-fairscheduler.xml b/conf/livy-fairscheduler.xml new file mode 100644 index 000000000..eadcddc4a --- /dev/null +++ b/conf/livy-fairscheduler.xml @@ -0,0 +1,31 @@ + + + + + + + FIFO + 1 + 0 + + + FAIR + 1 + 0 + + diff --git a/core/src/main/scala/org/apache/livy/sessions/Kind.scala b/core/src/main/scala/org/apache/livy/sessions/Kind.scala index 0a05c8fcd..056318024 100644 --- a/core/src/main/scala/org/apache/livy/sessions/Kind.scala +++ b/core/src/main/scala/org/apache/livy/sessions/Kind.scala @@ -35,6 +35,8 @@ object Shared extends Kind("shared") object SQL extends Kind("sql") +object SerialSQL extends Kind("serial_sql") + object Kind { def apply(kind: String): Kind = kind match { @@ -43,6 +45,7 @@ object Kind { case "sparkr" | "r" => SparkR case "shared" => Shared case "sql" => SQL + case "serial_sql" => SerialSQL case other => throw new IllegalArgumentException(s"Invalid kind: $other") } } diff --git a/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala index 5a7b60638..479e97a6c 100644 --- a/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala +++ b/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala @@ -64,7 +64,8 @@ import org.apache.livy.rsc.driver.SparkEntries class SQLInterpreter( sparkConf: SparkConf, rscConf: RSCConf, - sparkEntries: SparkEntries) extends Interpreter with Logging { + sparkEntries: SparkEntries, + pool: String = "default") extends Interpreter with Logging { private implicit def formats = DefaultFormats @@ -81,6 +82,7 @@ class SQLInterpreter( override protected[repl] def execute(code: String): Interpreter.ExecuteResponse = { try { + spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool) val result = spark.sql(code) val schema = parse(result.schema.json) @@ -94,6 +96,7 @@ class SQLInterpreter( case e => e } } + spark.sparkContext.setLocalProperty("spark.scheduler.pool", null) val jRows = Extraction.decompose(rows) diff --git a/repl/src/main/scala/org/apache/livy/repl/Session.scala b/repl/src/main/scala/org/apache/livy/repl/Session.scala index 78df50041..3a833077d 100644 --- a/repl/src/main/scala/org/apache/livy/repl/Session.scala +++ b/repl/src/main/scala/org/apache/livy/repl/Session.scala @@ -58,7 +58,10 @@ class Session( import Session._ private val interpreterExecutor = ExecutionContext.fromExecutorService( - Executors.newFixedThreadPool(livyConf.getInt(RSCConf.Entry.SESSION_INTERPRETER_THREADS))) + Executors.newSingleThreadExecutor()) + + private lazy val sqlExecutor = ExecutionContext.fromExecutorService( + Executors.newFixedThreadPool(livyConf.getInt(RSCConf.Entry.SQL_INTERPRETER_THREADS))) private val cancelExecutor = ExecutionContext.fromExecutorService( Executors.newSingleThreadExecutor()) @@ -105,7 +108,9 @@ class Session( throw new IllegalStateException("SparkInterpreter should not be lazily created.") case PySpark => PythonInterpreter(sparkConf, entries) case SparkR => SparkRInterpreter(sparkConf, entries) - case SQL => new SQLInterpreter(sparkConf, livyConf, entries) + case SQL => new SQLInterpreter(sparkConf, livyConf, entries, + livyConf.get(RSCConf.Entry.SQL_DEFAULT_SCHEDULER_POOL)) + case SerialSQL => new SQLInterpreter(sparkConf, livyConf, entries) } interp.start() interpGroup(kind) = interp @@ -160,6 +165,12 @@ class Session( val statement = new Statement(statementId, code, StatementState.Waiting, null) _statements.synchronized { _statements(statementId) = statement } + val threadPool = if (tpe == SQL) { + sqlExecutor + } else { + interpreterExecutor + } + Future { this.synchronized { setJobGroup(tpe, statementId) } statement.compareAndTransit(StatementState.Waiting, StatementState.Running) @@ -171,7 +182,7 @@ class Session( statement.compareAndTransit(StatementState.Running, StatementState.Available) statement.compareAndTransit(StatementState.Cancelling, StatementState.Cancelled) statement.updateProgress(1.0) - }(interpreterExecutor) + }(threadPool) statementId } @@ -333,7 +344,7 @@ class Session( private def setJobGroup(codeType: Kind, statementId: Int): String = { val jobGroup = statementIdToJobGroup(statementId) val (cmd, tpe) = codeType match { - case Spark | SQL => + case Spark | SQL | SerialSQL => // A dummy value to avoid automatic value binding in scala REPL. (s"""val _livyJobGroup$jobGroup = sc.setJobGroup("$jobGroup",""" + s""""Job group for statement $jobGroup")""", diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index 790f912cf..b2a29049a 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -45,6 +45,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.Promise; import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.scheduler.SchedulingMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,7 @@ class ContextLauncher { private static final String SPARK_JARS_KEY = "spark.jars"; private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives"; private static final String SPARK_HOME_ENV = "SPARK_HOME"; + private static final String FAIR_SCHEDULER_FILENAME = "livy-fairscheduler.xml"; static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf) throws IOException { @@ -180,6 +182,9 @@ private static ChildProcess startDriver(final RSCConf conf, Promise promise) // connections for the same registered app. conf.set("spark.yarn.maxAppAttempts", "1"); + conf.set("spark.scheduler.mode", SchedulingMode.FAIR().toString()); + conf.set("spark.scheduler.allocation.file", FAIR_SCHEDULER_FILENAME); + // Let the launcher go away when launcher in yarn cluster mode. This avoids keeping lots // of "small" Java processes lingering on the Livy server node. conf.set("spark.yarn.submit.waitAppCompletion", "false"); @@ -219,6 +224,8 @@ public void run() { } else { final SparkLauncher launcher = new SparkLauncher(); launcher.setSparkHome(System.getenv(SPARK_HOME_ENV)); + launcher.addFile( + System.getenv().get("LIVY_CONF_DIR") + File.separator + FAIR_SCHEDULER_FILENAME); launcher.setAppResource(SparkLauncher.NO_RESOURCE); launcher.setPropertiesFile(confFile.getAbsolutePath()); launcher.setMainClass(RSCDriverBootstrapper.class.getName()); diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java index fd645fd22..b6204e52f 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCConf.java @@ -44,7 +44,6 @@ public static enum Entry implements ConfEntry { CLIENT_SHUTDOWN_TIMEOUT("client.shutdown-timeout", "10s"), DRIVER_CLASS("driver-class", null), SESSION_KIND("session.kind", null), - SESSION_INTERPRETER_THREADS("session.interpreter.threadPool.size", 1), LIVY_JARS("jars", null), SPARKR_PACKAGE("sparkr.package", null), @@ -81,6 +80,8 @@ public static enum Entry implements ConfEntry { RETAINED_STATEMENTS("retained-statements", 100), RETAINED_SHARE_VARIABLES("retained.share-variables", 100), + SQL_DEFAULT_SCHEDULER_POOL("sql.default.scheduler.pool", "fair"), + SQL_INTERPRETER_THREADS("sql.interpreter.threadPool.size", 1), // Number of result rows to get for SQL Interpreters. SQL_NUM_ROWS("sql.num-rows", 1000);