-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34265][WIP][PYTHON][SQL] Instrument Python UDF execution using SQL Metrics #31367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
|
cc: @HyukjinKwon @ueshin |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
cc @BryanCutler too |
|
Test build #134606 has finished for PR 31367 at commit
|
|
Kubernetes integration test starting |
|
Test build #134619 has finished for PR 31367 at commit
|
|
Kubernetes integration test status failure |
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really calculating the exec time? Seems like it's only calculating the time for fromJava(result)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, this is just a component, actually small and could be omitted. The most important part of the execution for BatchEvalPythonExec time is measured in PythonUDFRunner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here? Seems like it's just calculating the batch loading time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. From what I can see by experimenting, is that this actually triggers execution, so the time measured here seems to be a relevant value for the execution time. Ideally we would like to measure on the Python side, although this seems good enough?
maropu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding new metrics for python-related plan nodes looks useful (I think we need to discuss which metrics to include and which metrics to not include though)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add tests in a Scala side (e.g., SQLMetricsSuite), too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I am now sure on how to do a Scala side test for Python UDF though.
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to implement many metrics, how about using Map[String, SQLMetric] instead? (see ShuffledRowRDD)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original thought here was to use a trait so that we can reuse the metrics definitaion with the many physical plans that need it (BatchEvalPythonExec, ArrowEcalPythonExec, AggregateInPandasExec, etc). Other implementations of this can work too. I see that ShuffleRowRDD uses a custom object SQLShuffleMetricsReporter to define and handle the metrics. Can you please elaborate a bit more on your proposal?
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
Outdated
Show resolved
Hide resolved
|
Could you list up the new metrics (their names and description) that this PR intends to add in the |
|
I have updated PR with the proposed list of metrics. The list currently contains 10 metrics. Another point for discussion is how accurate are the metrics in the current implementation? I have run a few tests to check that the values measured make sense and are in the ballpark of what was expected. There is room to do more tests with some corner cases to understand how the instrumentation copes there. In particular, measuring execution time can be challenging at times, as with this we attempt to do all measuremetns from JVM/Scala code. I am aiming to do a “good enough to be useful” job for the timing metrics, rather than a precise timing. I have put in the metrics description some hints of the nuances I found when testing. I think the "time spent sending data" can be useful when troubleshooting cases where the performance problem is with sending large amounts of data to Python. Time spent executing is the key metric to understand the overall performance. Number of rows returned and processed are also useful metric, to understand how much work has been done and how much still needs to be done when monitoring the progress of an active query. |
Could you use texts to list up the metrics instead of the image? That's because the PR description will appear in the commit logs after this PR merged and the image data makes it hard to search commit logs, I think. |
|
Test build #134659 has started for PR 31367 at commit |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
Outdated
Show resolved
Hide resolved
|
Test build #134660 has finished for PR 31367 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #134666 has finished for PR 31367 at commit
|
0932597 to
8e8fee8
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #135347 has finished for PR 31367 at commit
|
8e8fee8 to
e9db06c
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #136857 has finished for PR 31367 at commit
|
e9db06c to
9734f9c
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #138798 has finished for PR 31367 at commit
|
9734f9c to
9690426
Compare
|
Test build #141504 has started for PR 31367 at commit |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
This has been blocked for a while. I believe that from the discussion it has emerged:
I propose to take some of the ideas in the PR to a different and less complex PR: #33559 |
What changes were proposed in this pull request?
This proposes to add SQLMetrics instrumentation for Python UDF execution.
The proposed metrics with a short description are:
time spent executingtime spent executing the Python UDF and sending the results back to the executors. It can contain a component related to the time spent waiting for data to process, when data sending is slower compared to execution timetime spent sending datatime spent sending data to the Python workers, this is part of the Python UDF execution timetime spent sending codetime spent sending Python code to the Python workersbytes of code sentthe number of bytes of serialized Python code sent the to the Python workersbytes of data returnedthe number of bytes of serialized data received back from the Python workersbytes of data sentthe number of bytes of serialized data sent the to the Python workersnumber of batches returnedthe number of data batches received back from the Python workersnumber of batches processedthe number of data batches sent to the Python workersnumber of rows returnedthe number of rows returned by the Python workersnumber of rows processedthe number rows sent to the Python workersWhy are the changes needed?
This aims at improving monitoring and performance troubleshooting of Python code called by Spark, via UDF, Pandas UDF or with MapPartittions.
Does this PR introduce any user-facing change?
The introduced SQL metrics are exposed to the end users via the WebUI interface, in the SQL tab, for execution steps related to Python UDF execution, namely BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas.

See also the screenshot with the metrics introduced:
How was this patch tested?
Manually tested + a python test has been added.
Example code used for testing:
This is used to test with more data pushed to the Python workers