-
Notifications
You must be signed in to change notification settings - Fork 953
[KYUUBI #1022] Add basic EngineStatusStore for events #1023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @yaooqinn, realized a single memory store called It's ok to rework with |
postListenerEvent(KyuubiEngineOperationClosedEvent(id, System.currentTimeMillis())) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does EventLoggingService
need to store these events? @yaooqinn @ulysses-you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @yaooqinn @ulysses-you @cfmcgrady, looking forward to your views.
...ubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventManager.scala
Outdated
Show resolved
Hide resolved
Thanks @cfmcgrady @yaooqinn, make the current patch to draft. From my point of view, the |
Codecov Report
@@ Coverage Diff @@
## master #1023 +/- ##
============================================
- Coverage 79.24% 79.21% -0.04%
Complexity 90 90
============================================
Files 177 178 +1
Lines 6620 6648 +28
Branches 783 785 +2
============================================
+ Hits 5246 5266 +20
- Misses 920 926 +6
- Partials 454 456 +2
Continue to review full report at Codecov.
|
How about we reuse the current |
It's not a right way to reuse event logging service.
In fact, using |
can we check SparkHistoryEventLogger? |
+1 |
hi @yaooqinn, I can not understand why kyuubi need And, the current design of logger service is not clear to me. As we know a service means that it should listen to some port and can start / stop. I think the following will be more clear. |
Looked into the logger service, the loggers are executed sequentially, and the loggers are called synchronously. And checked the |
SparkHistoryEventLogger is used to log all Kyuubi's events to the same file of spark's event log. We also support other destinations, like Kyuubi's own JSON log store, JDBC(planned), etc..
It's not. it follows spark.eventLog.enabled when logging. We just don't push the events to the listener bus but call the event log listener directly.
Hmm... do not go too far with the spark's listeners and the listener bus if you are not understanding what you are doing here. IIUC, you want everything to work with spark's listeners and the listener bus, this is not what we want. We are a Spark application BUT not only a spark application that is limited to spark's functionalities. What we want is only a size-limit buffer (using |
Yes, this is a spot that we can improve too |
hi @yaooqinn, there seems to be no good way to solve the synchronization problem. For It seems we should improve it first, because if we listener these events in this pr, it may affects the performance. |
Can we start with session Events only in another PR, which can minimize the review burdern |
We can combine the history logging and the live data logging with the listener bus but we can still use the event logging service |
...nals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
Outdated
Show resolved
Hide resolved
...-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala
Outdated
Show resolved
Hide resolved
I take my early judgment back, and the listener bus looks the right way to go. #1023 (comment) |
...kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineAppStatusStore.scala
Outdated
Show resolved
Hide resolved
Sure, will do. |
This looks simple, Please send a separate PR to solve this issue
we can implement this latter too |
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> > there seems to be no good way to solve the synchronization problem. For `EventLoggerType.SPARK` This looks simple, Please send a separate PR to solve this issue ```git - sc.eventLogger.foreach(_.onOtherEvent(kyuubiEvent)) + sc.listenerBus.post(kyuubiEvent) ``` _Originally posted by yaooqinn in #1023 (comment) ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1044 from timothy65535/1043. Closes #1043 0ea1d4f [timothy65535] [KYUUBI #1043] Let spark history logger handle events asynchronously Authored-by: timothy65535 <timothy65535@163.com> Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
Had tried serveal times, but failed. For spark origin event log
|
hi @yaooqinn @cfmcgrady, looking forward to your suggestions Had read the whole ui design of sparkthriftserver And already tried to implement a memory status store At last, in terms of design compatibility and simplicity, I think it's better to reuse the thriftserver's ui design. We need to consider not only the pages on engine server, but also the pages on spark history server. |
cc @yaooqinn, the patch updated with customized store. |
...rk-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
Outdated
Show resolved
Hide resolved
...ls/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/EngineStatusListener.scala
Outdated
Show resolved
Hide resolved
cc @yaooqinn, already to go. |
can you update the PR description and maybe also some comments in the code changes to help us review? |
Updated. The store based on |
cc @cfmcgrady when you are free, thanks. |
cc @zhang1002 |
cc @pan3793 @ulysses-you, help to review when free, thanks. |
...park-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
Outdated
Show resolved
Hide resolved
...park-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
Outdated
Show resolved
Hide resolved
...park-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
Outdated
Show resolved
Hide resolved
...park-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
Show resolved
Hide resolved
cc @yaooqinn ready to review, looking for your new advice. |
/** | ||
* cleanup the session events if reach the threshold | ||
*/ | ||
private def checkSessionCapacity(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how efficient is this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't find a suitable opensource library to supports following characteristic:
- Concurrency: concurrent read、update、remove
- Update
- Order
- Complexity
If we implement a new Map which extends AbstractMap, it seems will be complex.
If we keep 200 events in memory, efficiency may not be affected.
private def checkSessionCapacity(): Unit = { | ||
var countToDelete = sessions.size - retainedSessions | ||
|
||
val reverseSeq = sessions.values().asScala.toSeq.sortBy(_.startTime).reverse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we sort the value set for evevy single event?...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we use treemap, and let startTime or endTime as key, it will remove events if key repeat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here you should use asyn, by this you can sort the value only once when countToDelete reached the set value.
if (retainedSessions/sessions.size >= threshold) { new thread { sort() delete() } }
also you can see Guava Cache, you can set the expire strategy by youself but it will waste some Mem...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we use a new thread, thinghs will be more complex
cc @ulysses-you @pan3793 @cfmcgrady @yaooqinn had already tried serval implements: looking for advice, thanks |
BTW, at this stage, seems we don't need to spend a lot of time on details, especially on how to implement the store, it already cost two week. |
cc @ulysses-you @pan3793 @cfmcgrady @yaooqinn, any more thought? |
cc @ulysses-you @pan3793 @yaooqinn, any more thought? thanks |
I am +0 on this, but since no one has an opposite option on this implementation, I get this merged. |
OK, new idea to the implement of the store is welcome. |
Why are the changes needed?
For more detail, please go to #981
EngineStatusStore
helps to push events to listener busEngineStatusStore
is a memory store that tracking the number of statements and sessions, it provides:How was this patch tested?
Add some test cases that check the changes thoroughly including negative and positive cases if possible
Add screenshots for manual tests if appropriate
Run test locally before make a pull request