Skip to content

Commit

Permalink
Allow interpreterExecutor run in ThreadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
WeiWenda committed Dec 21, 2018
1 parent 4cfb6bc commit 01a9e0a
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
4 changes: 2 additions & 2 deletions repl/src/main/scala/org/apache/livy/repl/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class Session(
import Session._

private val interpreterExecutor = ExecutionContext.fromExecutorService(
Executors.newSingleThreadExecutor())
Executors.newFixedThreadPool(livyConf.getInt(RSCConf.Entry.SESSION_INTERPRETER_THREADS)))

private val cancelExecutor = ExecutionContext.fromExecutorService(
Executors.newSingleThreadExecutor())
Expand Down Expand Up @@ -161,7 +161,7 @@ class Session(
_statements.synchronized { _statements(statementId) = statement }

Future {
setJobGroup(tpe, statementId)
this.synchronized { setJobGroup(tpe, statementId) }
statement.compareAndTransit(StatementState.Waiting, StatementState.Running)

if (statement.state.get() == StatementState.Running) {
Expand Down
1 change: 1 addition & 0 deletions rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static enum Entry implements ConfEntry {
CLIENT_SHUTDOWN_TIMEOUT("client.shutdown-timeout", "10s"),
DRIVER_CLASS("driver-class", null),
SESSION_KIND("session.kind", null),
SESSION_INTERPRETER_THREADS("session.interpreter.threadPool.size", 1),

LIVY_JARS("jars", null),
SPARKR_PACKAGE("sparkr.package", null),
Expand Down

0 comments on commit 01a9e0a

Please sign in to comment.