Skip to content

[KYUUBI #1022] Add basic EngineStatusStore for events #1023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

[KYUUBI #1022] Add basic EngineStatusStore for events #1023

wants to merge 1 commit into from

Conversation

charlesy6
Copy link
Contributor

@charlesy6 charlesy6 commented Sep 3, 2021

Why are the changes needed?

For more detail, please go to #981

EngineStatusStore helps to push events to listener bus

EngineStatusStore is a memory store that tracking the number of statements and sessions, it provides:

  • stores all elements, and sorted by startTimestamp.
  • cleanup the last elements when reach a certain threshold.

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before make a pull request

@charlesy6
Copy link
Contributor Author

cc @yaooqinn, realized a single memory store called EngineStatusStore.

It's ok to rework with ElementTrackingStore. #981 (comment)

postListenerEvent(KyuubiEngineOperationClosedEvent(id, System.currentTimeMillis()))
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does EventLoggingService need to store these events? @yaooqinn @ulysses-you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we make EventLoggingService as a sparklistener is better.

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yaooqinn @ulysses-you @cfmcgrady, looking forward to your views.

@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 4, 2021

Thanks @cfmcgrady @yaooqinn, make the current patch to draft.

From my point of view, the EngineStatusStore can meet the needs, but from a long-term perspective, prefer to use ElementTrackingStore. So starting rework this patch by using ElementTrackingStore.

@charlesy6 charlesy6 marked this pull request as draft September 4, 2021 03:04
@codecov-commenter
Copy link

codecov-commenter commented Sep 6, 2021

Codecov Report

Merging #1023 (74872d7) into master (cbe5bee) will decrease coverage by 0.03%.
The diff coverage is 86.66%.

❗ Current head 74872d7 differs from pull request most recent head b9f355c. Consider uploading reports for the commit b9f355c to get more accurate results
Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1023      +/-   ##
============================================
- Coverage     79.24%   79.21%   -0.04%     
  Complexity       90       90              
============================================
  Files           177      178       +1     
  Lines          6620     6648      +28     
  Branches        783      785       +2     
============================================
+ Hits           5246     5266      +20     
- Misses          920      926       +6     
- Partials        454      456       +2     
Impacted Files Coverage Δ
...kyuubi/engine/spark/events/EngineEventsStore.scala 81.25% <81.25%> (ø)
...in/scala/org/apache/kyuubi/config/KyuubiConf.scala 95.24% <83.33%> (-0.14%) ⬇️
...rg/apache/kyuubi/engine/spark/SparkSQLEngine.scala 68.00% <100.00%> (ø)
...g/apache/spark/kyuubi/SparkSQLEngineListener.scala 87.93% <100.00%> (+1.39%) ⬆️
.../org/apache/kyuubi/operation/KyuubiOperation.scala 50.00% <0.00%> (-5.36%) ⬇️
...pache/kyuubi/sql/KyuubiQueryStagePreparation.scala 80.39% <0.00%> (-0.99%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cbe5bee...b9f355c. Read the comment docs.

@charlesy6 charlesy6 marked this pull request as ready for review September 6, 2021 06:38
@yaooqinn
Copy link
Member

yaooqinn commented Sep 6, 2021

How about we reuse the current event logging service and only introduce the ElementTrackingStore or something else as an in-memory implementation?

@charlesy6
Copy link
Contributor Author

How about we reuse the current event logging service

It's not a right way to reuse event logging service.

  • We should push events to SparkListenerBus, if not, the spark history server will not receive these custom events.
  • It's better to let different listeners do their own independent things

and only introduce the ElementTrackingStore or something else as an in-memory implementation?

In fact, using ElementTrackingStore is the least expensive, if we create something as an in-memory implementation will spend more time. I had tried these two ways.

@yaooqinn
Copy link
Member

yaooqinn commented Sep 6, 2021

We should push events to SparkListenerBus, if not, the spark history server will not receive these custom events.

can we check SparkHistoryEventLogger?

@cfmcgrady
Copy link
Contributor

How about we reuse the current event logging service and only introduce the ElementTrackingStore or something else as an in-memory implementation?

+1

@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 6, 2021

hi @yaooqinn, I can not understand why kyuubi need SparkHistoryEventLogger? it seems duplicated to spark.eventLog.enabled=true.

And, the current design of logger service is not clear to me. As we know a service means that it should listen to some port and can start / stop.

I think the following will be more clear.

image

@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 6, 2021

Looked into the logger service, the loggers are executed sequentially, and the loggers are called synchronously.

image


And checked the LiveListenerBus, it is an asynchronous listener bus for Spark events. SPARK-20863


@yaooqinn
Copy link
Member

yaooqinn commented Sep 6, 2021

hi @yaooqinn, I can not understand why kyuubi need SparkHistoryEventLogger?

SparkHistoryEventLogger is used to log all Kyuubi's events to the same file of spark's event log. We also support other destinations, like Kyuubi's own JSON log store, JDBC(planned), etc..

it seems duplicated to spark.eventLog.enabled=true.

It's not. it follows spark.eventLog.enabled when logging. We just don't push the events to the listener bus but call the event log listener directly.

And, the current design of logger service is not clear to me. As we know a service means that it should listen to some port and can start / stop.

Hmm... do not go too far with the spark's listeners and the listener bus if you are not understanding what you are doing here.

IIUC, you want everything to work with spark's listeners and the listener bus, this is not what we want. We are a Spark application BUT not only a spark application that is limited to spark's functionalities.

What we want is only a size-limit buffer (using ElementTrackingStore is just an option) that holds all the current Kyuubi events in memory. Then render one or more Spark UI pages/tables based on this buffer.

@yaooqinn
Copy link
Member

yaooqinn commented Sep 6, 2021

Looked into the logger service, the loggers are executed sequentially, and the loggers are called synchronously.

image

And checked the LiveListenerBus, it is an asynchronous listener bus for Spark events. SPARK-20863

Yes, this is a spot that we can improve too

@charlesy6
Copy link
Contributor Author

Yes, this is a spot that we can improve too

hi @yaooqinn, there seems to be no good way to solve the synchronization problem. For EventLoggerType.SPARK and EventLoggerType.JSON, we need to think out a asynchronous writer for local path or hdfs path.

It seems we should improve it first, because if we listener these events in this pr, it may affects the performance.

@yaooqinn
Copy link
Member

yaooqinn commented Sep 6, 2021

Can we start with session Events only in another PR, which can minimize the review burdern

@yaooqinn
Copy link
Member

yaooqinn commented Sep 6, 2021

We can combine the history logging and the live data logging with the listener bus but we can still use the event logging service

@yaooqinn
Copy link
Member

yaooqinn commented Sep 6, 2021

Hmm... do not go too far with the spark's listeners and the listener bus if you are not understanding what you are doing here.

I take my early judgment back, and the listener bus looks the right way to go. #1023 (comment)

@charlesy6
Copy link
Contributor Author

Can we start with session Events only in another PR, which can minimize the review burdern

Sure, will do.

@yaooqinn
Copy link
Member

yaooqinn commented Sep 6, 2021

there seems to be no good way to solve the synchronization problem.
For EventLoggerType.SPARK

This looks simple, Please send a separate PR to solve this issue

-    sc.eventLogger.foreach(_.onOtherEvent(kyuubiEvent))
+    sc.listenerBus.post(kyuubiEvent)

and EventLoggerType.JSON, we need to think out a asynchronous writer for local path or hdfs

we can implement this latter too

ulysses-you pushed a commit that referenced this pull request Sep 7, 2021
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

> there seems to be no good way to solve the synchronization problem.
For `EventLoggerType.SPARK`

This looks simple, Please send a separate PR to solve this issue

```git
-    sc.eventLogger.foreach(_.onOtherEvent(kyuubiEvent))
+    sc.listenerBus.post(kyuubiEvent)
```

_Originally posted by yaooqinn in #1023 (comment)

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1044 from timothy65535/1043.

Closes #1043

0ea1d4f [timothy65535] [KYUUBI #1043] Let spark history logger handle events asynchronously

Authored-by: timothy65535 <timothy65535@163.com>
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 7, 2021

We can combine the history logging and the live data logging with the listener bus but we can still use the event logging service

Had tried serveal times, but failed.

For EventLoggerType.JSON logger, it requires all fields are set.
But for EventLoggerType.SPARK logger, it don't needs all fields.

image

spark origin event log

{"Event":"SparkListenerJobEnd","Job ID":2,"Completion Time":1630997161352,"Job Result":{"Result":"JobSucceeded"}}
{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":3,"time":1630997161355}
{"Event":"org.apache.spark.sql.hive.thriftserver.ui.SparkListenerThriftServerOperationFinish","id":"8f2c5e68-c9fe-4777-bffe-75d171fa18cb","finishTime":1630997161355}
{"Event":"org.apache.spark.sql.hive.thriftserver.ui.SparkListenerThriftServerOperationClosed","id":"8f2c5e68-c9fe-4777-bffe-75d171fa18cb","closeTime":1630997161374}
{"Event":"org.apache.spark.sql.hive.thriftserver.ui.SparkListenerThriftServerSessionClosed","sessionId":"7e3f25d8-5688-4170-8532-5ddee4112991","finishTime":1630997192176}
{"Event":"SparkListenerApplicationEnd","Timestamp":1630997194335}

@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 7, 2021

hi @yaooqinn @cfmcgrady, looking forward to your suggestions

Had read the whole ui design of sparkthriftserver
https://github.com/apache/spark/tree/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui

And already tried to implement a memory status store
https://github.com/timothy65535/kyuubi/blob/ky-981-t1/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/ui/EngineStatusStore.scala

At last, in terms of design compatibility and simplicity, I think it's better to reuse the thriftserver's ui design. We need to consider not only the pages on engine server, but also the pages on spark history server.

@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 8, 2021

cc @yaooqinn, the patch updated with customized store.

@charlesy6 charlesy6 marked this pull request as draft September 8, 2021 03:38
@charlesy6 charlesy6 marked this pull request as ready for review September 9, 2021 01:39
@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 9, 2021

cc @yaooqinn, already to go.

@yaooqinn
Copy link
Member

yaooqinn commented Sep 9, 2021

can you update the PR description and maybe also some comments in the code changes to help us review?

@charlesy6
Copy link
Contributor Author

can you update the PR description and maybe also some comments in the code changes to help us review?

Updated. The store based on TreeBasedStore, it supports row keys and column keys are ordered by natural ordering by default.

@charlesy6
Copy link
Contributor Author

cc @cfmcgrady when you are free, thanks.

@yaooqinn
Copy link
Member

cc @zhang1002

@charlesy6
Copy link
Contributor Author

cc @pan3793 @ulysses-you, help to review when free, thanks.

@charlesy6 charlesy6 marked this pull request as draft September 15, 2021 02:25
@charlesy6 charlesy6 marked this pull request as ready for review September 15, 2021 06:29
@charlesy6
Copy link
Contributor Author

cc @yaooqinn ready to review, looking for your new advice.

/**
* cleanup the session events if reach the threshold
*/
private def checkSessionCapacity(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how efficient is this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't find a suitable opensource library to supports following characteristic:

  • Concurrency: concurrent read、update、remove
  • Update
  • Order
  • Complexity

If we implement a new Map which extends AbstractMap, it seems will be complex.

If we keep 200 events in memory, efficiency may not be affected.

private def checkSessionCapacity(): Unit = {
var countToDelete = sessions.size - retainedSessions

val reverseSeq = sessions.values().asScala.toSeq.sortBy(_.startTime).reverse
Copy link
Member

@yaooqinn yaooqinn Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we sort the value set for evevy single event?...

Copy link
Contributor Author

@charlesy6 charlesy6 Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use treemap, and let startTime or endTime as key, it will remove events if key repeat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you should use asyn, by this you can sort the value only once when countToDelete reached the set value.
if (retainedSessions/sessions.size >= threshold) { new thread { sort() delete() } }
also you can see Guava Cache, you can set the expire strategy by youself but it will waste some Mem...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use a new thread, thinghs will be more complex

@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 17, 2021

cc @ulysses-you @pan3793 @cfmcgrady @yaooqinn

had already tried serval implements:
V1: custom store based on treemap
V2: use spark ElementTrackingStore
V3: custom store based on TreebasedTable, ordered by startTime
V4: custom store based on ConcurrentHashMap, ordered by finishTime

looking for advice, thanks

@charlesy6
Copy link
Contributor Author

BTW, at this stage, seems we don't need to spend a lot of time on details, especially on how to implement the store, it already cost two week.

@charlesy6 charlesy6 changed the title [KYUUBI #1022] Add EventManager with basic EngineStatusStore [KYUUBI #1022] Add basic EngineStatusStore for events Sep 18, 2021
@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 24, 2021

cc @ulysses-you @pan3793 @cfmcgrady @yaooqinn, any more thought?

@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 26, 2021

cc @ulysses-you @pan3793 @yaooqinn, any more thought? thanks

@yaooqinn yaooqinn closed this in 22e6432 Sep 26, 2021
@yaooqinn
Copy link
Member

I am +0 on this, but since no one has an opposite option on this implementation, I get this merged.

@charlesy6 charlesy6 deleted the ky-1022 branch September 26, 2021 05:31
@charlesy6
Copy link
Contributor Author

charlesy6 commented Sep 26, 2021

I am +0 on this, but since no one has an opposite option on this implementation

OK, new idea to the implement of the store is welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants