-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[PIO-182] Add async methods to LEventStore #482
Conversation
54af5f4
to
77e2f36
Compare
77e2f36
to
54269c6
Compare
@dszeto @shimamoto Could you take a look? Chris who raised this issue wants to do adding the asynchrnous processing capability to other storages and engine server following this pull request. |
@@ -89,6 +103,57 @@ object LEventStore { | |||
timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is preferable to call the findByEntityAsync
method from this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -142,4 +209,56 @@ object LEventStore { | |||
limit = limit), timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed as well.
* export JAVA_OPTS="$JAVA_OPTS \ | ||
* -Dscala.concurrent.context.numThreads=1000 \ | ||
* -Dscala.concurrent.context.maxThreads=1000" | ||
* </pre> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dszeto Is this comment understandable? Since I'm not confident that it explains correctly and clearly, I'd like to request your confirmation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add a:
Since using scala.concurrent.ExecutionContext.Implicits.global
is not a good implementation, we only import it in the blocking method if the user is calling it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Wei-1 Thanks for your feedback! I added line comment on importing as below:
// Import here to ensure ExecutionContext.Implicits.global is used only this method
import scala.concurrent.ExecutionContext.Implicits.global
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, only in
this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, fixed. thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. We can link to https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context for a detail explanation if you'd like to.
@@ -72,9 +84,61 @@ object LEventStore { | |||
latest: Boolean = true, | |||
timeout: Duration = defaultTimeout): Iterator[Event] = { | |||
|
|||
import scala.concurrent.ExecutionContext.Implicits.global |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to ask, but I will like to clarify.
The reason why we import this line in the object instead of putting it on top is because of:
importing global will introduce a lazy ExecutionContext class
Since it will only be created when it is called, we import it in each thread separately so we won't be using one single ExecutionContext.
Is my interpretation correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since using ExecutionContext.Implicits.global
is bad habit in Scala basically, I wanted to make sure to use it only in these methods.
And your interpretation is correct. Even if we call it twice, it returns a same instance.
Any chance that this gets merged in develop soon? Would need to base my async work on that |
@longliveenduro How do you feel about Scaladoc of |
7e6f437
to
fae05f4
Compare
@takezoe The scaladoc is clear I think. But after some days of letting this settle and also reading this excellent blog: https://www.beyondthelines.net/computing/scala-future-and-execution-context/ I am not sure if it is really a good idea to use the "standard" Scala Execution Context for the "old blocking" code. Another idea would we (like proposed in the Blog) use a separate Thread pool, make the defaults the same as now (or maybe raise the allowed thread count to bypass/hack the problem from now on a bit for usages of the old code) and make the size of the thread pool configurable like the standard scala thread pool. For example this blog suggests that a FixedThreadPool for blocking purposes might be a good idea: |
@longliveenduro Even if we use thread for asynchronous processing, we should use separated thread pools for CPU bound processing and I/O bound processing. The thread pool for CPU bound processing should have threads close to the number of CPUs. One for I/O bound processing should have threads significantly larger than the number of CPUs. Anyway, I think that allowing to give an execution context from user side is good strategy because it makes possible to control by users. |
@takezoe yes exactly, seperate Threadpools for CPU and I/O bound blocking processing looks like a very good idea for a transition phase. Also looking at the Blogs they suggest a fixed Threadpool instead of a ForkJoin Threadpool (which the "standard" Scala thread pool is) for blocking I/O code. Anyhow, looking at your PR, you are importing the standard Scala execution context in the methods that still use blocking via Await.result. What about using a separate Threadpool there whose size is either configurable or you pass it as a parameter. Passing as a parameter though is breaking the current contract and is a little bit strange since the method does not return an async instance (e.g. Future) which I would expect from a scala method which needs an ExecutionContext param. Very interested what you think! |
@longliveenduro Using the default execution context in this pull request is just for backward compatibility with the current blocking methods. Making it configurable might be better, but we can increase the number of threads by giving a VM option. I think it's a sufficient workaround. |
How about we push for deprecating blocking methods? We can update docs to instruct developers to start using non-blocking methods instead. |
I'm not sure if deprecating current blocking methods in the next version is good. I would like to see following work by @longliveenduro before making a decision. |
Anyway, I'm going to merge. Thanks for all reviews! |
This pull request contains following updates:
LEventStore
andLJavaEventStore
scala.concurrent.ExecutionContext.Implicits.global
which is used in blocking methodsSee also: https://lists.apache.org/thread.html/f14e4f8f29410e4585b3d8e9f646b88293a605f4716d3c4d60771854@%3Cuser.predictionio.apache.org%3E