Skip to content

Commit

Permalink
Only SQL Kind Statement run in ThreadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
WeiWenda committed Jan 15, 2019
1 parent 01a9e0a commit c9ef950
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 7 deletions.
2 changes: 1 addition & 1 deletion bin/livy-server
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
usage="Usage: livy-server (start|stop|status)"

export LIVY_HOME=$(cd $(dirname $0)/.. && pwd)
LIVY_CONF_DIR=${LIVY_CONF_DIR:-"$LIVY_HOME/conf"}
export LIVY_CONF_DIR=${LIVY_CONF_DIR:-"$LIVY_HOME/conf"}

if [ -f "${LIVY_CONF_DIR}/livy-env.sh" ]; then
# Promote all variable declarations to environment (exported) variables
Expand Down
31 changes: 31 additions & 0 deletions conf/livy-fairscheduler.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<allocations>
<pool name="default">
<schedulingMode>FIFO</schedulingMode>
<weight>1</weight>
<minShare>0</minShare>
</pool>
<pool name="fair">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>0</minShare>
</pool>
</allocations>
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/livy/sessions/Kind.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ object Shared extends Kind("shared")

object SQL extends Kind("sql")

object SerialSQL extends Kind("serial_sql")

object Kind {

def apply(kind: String): Kind = kind match {
Expand All @@ -43,6 +45,7 @@ object Kind {
case "sparkr" | "r" => SparkR
case "shared" => Shared
case "sql" => SQL
case "serial_sql" => SerialSQL
case other => throw new IllegalArgumentException(s"Invalid kind: $other")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ import org.apache.livy.rsc.driver.SparkEntries
class SQLInterpreter(
sparkConf: SparkConf,
rscConf: RSCConf,
sparkEntries: SparkEntries) extends Interpreter with Logging {
sparkEntries: SparkEntries,
pool: String) extends Interpreter with Logging {

private implicit def formats = DefaultFormats

Expand All @@ -81,6 +82,7 @@ class SQLInterpreter(

override protected[repl] def execute(code: String): Interpreter.ExecuteResponse = {
try {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
val result = spark.sql(code)
val schema = parse(result.schema.json)

Expand All @@ -94,6 +96,7 @@ class SQLInterpreter(
case e => e
}
}
spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)

val jRows = Extraction.decompose(rows)

Expand Down
19 changes: 15 additions & 4 deletions repl/src/main/scala/org/apache/livy/repl/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ class Session(
import Session._

private val interpreterExecutor = ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(livyConf.getInt(RSCConf.Entry.SESSION_INTERPRETER_THREADS)))
Executors.newSingleThreadExecutor())

private lazy val sqlExecutor = ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(livyConf.getInt(RSCConf.Entry.SQL_INTERPRETER_THREADS)))

private val cancelExecutor = ExecutionContext.fromExecutorService(
Executors.newSingleThreadExecutor())
Expand Down Expand Up @@ -105,7 +108,9 @@ class Session(
throw new IllegalStateException("SparkInterpreter should not be lazily created.")
case PySpark => PythonInterpreter(sparkConf, entries)
case SparkR => SparkRInterpreter(sparkConf, entries)
case SQL => new SQLInterpreter(sparkConf, livyConf, entries)
case SQL => new SQLInterpreter(sparkConf, livyConf, entries,
livyConf.get(RSCConf.Entry.SQL_DEFAULT_SCHEDULER_POOL))
case SerialSQL => new SQLInterpreter(sparkConf, livyConf, entries, "default")
}
interp.start()
interpGroup(kind) = interp
Expand Down Expand Up @@ -160,6 +165,12 @@ class Session(
val statement = new Statement(statementId, code, StatementState.Waiting, null)
_statements.synchronized { _statements(statementId) = statement }

val threadPool = if (tpe == SQL) {
sqlExecutor
} else {
interpreterExecutor
}

Future {
this.synchronized { setJobGroup(tpe, statementId) }
statement.compareAndTransit(StatementState.Waiting, StatementState.Running)
Expand All @@ -171,7 +182,7 @@ class Session(
statement.compareAndTransit(StatementState.Running, StatementState.Available)
statement.compareAndTransit(StatementState.Cancelling, StatementState.Cancelled)
statement.updateProgress(1.0)
}(interpreterExecutor)
}(threadPool)

statementId
}
Expand Down Expand Up @@ -333,7 +344,7 @@ class Session(
private def setJobGroup(codeType: Kind, statementId: Int): String = {
val jobGroup = statementIdToJobGroup(statementId)
val (cmd, tpe) = codeType match {
case Spark | SQL =>
case Spark | SQL | SerialSQL =>
// A dummy value to avoid automatic value binding in scala REPL.
(s"""val _livyJobGroup$jobGroup = sc.setJobGroup("$jobGroup",""" +
s""""Job group for statement $jobGroup")""",
Expand Down
7 changes: 7 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Promise;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.scheduler.SchedulingMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,6 +70,7 @@ class ContextLauncher {
private static final String SPARK_JARS_KEY = "spark.jars";
private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
private static final String SPARK_HOME_ENV = "SPARK_HOME";
private static final String FAIR_SCHEDULER_FILENAME = "livy-fairscheduler.xml";

static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf)
throws IOException {
Expand Down Expand Up @@ -180,6 +182,9 @@ private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
// connections for the same registered app.
conf.set("spark.yarn.maxAppAttempts", "1");

conf.set("spark.scheduler.mode", SchedulingMode.FAIR().toString());
conf.set("spark.scheduler.allocation.file", FAIR_SCHEDULER_FILENAME);

// Let the launcher go away when launcher in yarn cluster mode. This avoids keeping lots
// of "small" Java processes lingering on the Livy server node.
conf.set("spark.yarn.submit.waitAppCompletion", "false");
Expand Down Expand Up @@ -219,6 +224,8 @@ public void run() {
} else {
final SparkLauncher launcher = new SparkLauncher();
launcher.setSparkHome(System.getenv(SPARK_HOME_ENV));
launcher.addFile(
System.getenv().get("LIVY_CONF_DIR") + File.separator + FAIR_SCHEDULER_FILENAME);
launcher.setAppResource(SparkLauncher.NO_RESOURCE);
launcher.setPropertiesFile(confFile.getAbsolutePath());
launcher.setMainClass(RSCDriverBootstrapper.class.getName());
Expand Down
3 changes: 2 additions & 1 deletion rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public static enum Entry implements ConfEntry {
CLIENT_SHUTDOWN_TIMEOUT("client.shutdown-timeout", "10s"),
DRIVER_CLASS("driver-class", null),
SESSION_KIND("session.kind", null),
SESSION_INTERPRETER_THREADS("session.interpreter.threadPool.size", 1),

LIVY_JARS("jars", null),
SPARKR_PACKAGE("sparkr.package", null),
Expand Down Expand Up @@ -81,6 +80,8 @@ public static enum Entry implements ConfEntry {
RETAINED_STATEMENTS("retained-statements", 100),
RETAINED_SHARE_VARIABLES("retained.share-variables", 100),

SQL_DEFAULT_SCHEDULER_POOL("sql.default.scheduler.pool", "fair"),
SQL_INTERPRETER_THREADS("sql.interpreter.threadPool.size", 1),
// Number of result rows to get for SQL Interpreters.
SQL_NUM_ROWS("sql.num-rows", 1000);

Expand Down

0 comments on commit c9ef950

Please sign in to comment.