-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[LIVY-544] Allow interpreterExecutor run in ThreadPool #135
base: master
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #135 +/- ##
============================================
+ Coverage 68.38% 68.44% +0.05%
Complexity 891 891
============================================
Files 100 100
Lines 5596 5597 +1
Branches 839 839
============================================
+ Hits 3827 3831 +4
+ Misses 1223 1220 -3
Partials 546 546
Continue to review full report at Codecov.
|
Codecov Report
@@ Coverage Diff @@
## master #135 +/- ##
============================================
- Coverage 68.82% 68.56% -0.27%
+ Complexity 906 905 -1
============================================
Files 100 100
Lines 5662 5688 +26
Branches 848 856 +8
============================================
+ Hits 3897 3900 +3
- Misses 1214 1232 +18
- Partials 551 556 +5
Continue to review full report at Codecov.
|
78c1c90
to
07fdd14
Compare
I have fixed all errors. Can you merge this pr. Thank you. |
@WeiWenda I am not a committer. Thanks for this great improvement. It would be awesome to have concurrentSQL cc @mgaido91 @jerryshao @vanzin @ajbozarth @zjffdu - please help to review. Thank you. |
I see no issues with the code, but since I don't have any experience with the SQL side of Livy I will leave final review and merging to a more knowledgable committer |
@@ -35,6 +35,8 @@ object Shared extends Kind("shared") | |||
|
|||
object SQL extends Kind("sql") | |||
|
|||
object ConcurrentSQL extends Kind("concurrentSQL") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that it is a good idea to add a new session kind. I think we should rather have a configuration for enabling FAIR vs FIFO and I am wondering if this shouldn't be done cross kind, ie. for all the kinds: why was it done only for SQL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 good point. There was some discussion on this at https://issues.apache.org/jira/browse/LIVY-544
For code paragraphs chances are that there is a dependency between code items, and they can't run in parallel. For SQL sometimes there is no dependency often when it's just a SELECT.. but we let end-users decide if there is truly no dependency and let queries execute in parallel only when it was requested explicitly. Also this was modeled after Zeppelin to some degree - Zeppelin only allows parallel execution for SQL cells and not code cells like pyspark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run code block in parallel is very confusing. Spark‘s FAIR scheduler mode meaning that all runable stages can receive executors right away no matter when it was submitted. So that concurrentSQL’s stages need run into FAIR pool. But for scala or python or java, code block can't make stages in most time, it make no sense to let them run in FAIR pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I review 'SparkSqlInterperter' in zeppelin, it do the same thing as I have done.
NewSparkInterpreter
if (entry.getKey().toString().equals("zeppelin.spark.concurrentSQL")
&& entry.getValue().toString().equals("true")) {
conf.set("spark.scheduler.mode", "FAIR");
}
SparkSqlInterpreter
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
...
sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool"));
InterpreterContext's localProperties come from RemoteInterpreterContext's localProperties, which is RemoteInterpreter's InterpreterContext, created by Paragrah's getInterpreterContext, configured by %spark.sql(pool=poolname,key=value)
private static Pattern REPL_PATTERN =
Pattern.compile("(\\s*)%([\\w\\.]+)(\\(.*?\\))?.*", Pattern.DOTALL);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But different from zeppelin, I don't think user can provide spark's fairscheduler.xml in proper way especially in cluster deploy, and can keep in mind which sql should run into which pool. This need a great understand of spark scheduler. So I do all things in livy to release user from configuring above items. Of course we can modify to
{"code":"", "kind":"sql", "pool":"xx"}
//or
{"code":"set pool=xx, query content", "kind":"sql"}
//or
{"code":"", "kind":"sql?pool=xx"}
I have to say that expansibility of livy is not as excellent as zeppelin, there should be a properties part in statement boby.
Would somebody else from committers be available to review this? my 2 cents - Thanks everyone. |
Hello, With this feature, would it mean that we can run several statement at the same time on one Livy session ? Regarding of course, that each statement is independant from the other. Thanks. |
Yes, you can think so. |
Hope to find some time to test. Is this the way you use this feature ? |
this is not my main area of expertise, @vanzin and @jerryshao might be better reviewers |
What changes were proposed in this pull request?
make interpreterExecutor run with newFixedThreadPool rather than newSingleThreadExecutor
https://issues.apache.org/jira/browse/LIVY-544
How was this patch tested?
we run a jmeter test of 20 threads each post statement 1/60s, set "spark.scheduler.mode": "FAIR"
we find setJobGroup is not thread-safe causing some reflect Error, so wrap it by Session.synchronized