Skip to content

Conversation

@LucaCanali
Copy link
Contributor

@LucaCanali LucaCanali commented Jan 27, 2021

What changes were proposed in this pull request?

This proposes to add SQLMetrics instrumentation for Python UDF execution.
The proposed metrics with a short description are:

time spent executing time spent executing the Python UDF and sending the results back to the executors. It can contain a component related to the time spent waiting for data to process, when data sending is slower compared to execution time
time spent sending data time spent sending data to the Python workers, this is part of the Python UDF execution time
time spent sending code time spent sending Python code to the Python workers
bytes of code sent the number of bytes of serialized Python code sent the to the Python workers
bytes of data returned the number of bytes of serialized data received back from the Python workers
bytes of data sent the number of bytes of serialized data sent the to the Python workers
number of batches returned the number of data batches received back from the Python workers
number of batches processed the number of data batches sent to the Python workers
number of rows returned the number of rows returned by the Python workers
number of rows processed the number rows sent to the Python workers

Why are the changes needed?

This aims at improving monitoring and performance troubleshooting of Python code called by Spark, via UDF, Pandas UDF or with MapPartittions.

Does this PR introduce any user-facing change?

The introduced SQL metrics are exposed to the end users via the WebUI interface, in the SQL tab, for execution steps related to Python UDF execution, namely BatchEvalPython, ArrowEvalPython, AggregateInPandas, FlaMapGroupsInPandas, FlatMapsCoGroupsInPandas, MapInPandas, WindowsInPandas.
See also the screenshot with the metrics introduced:

How was this patch tested?

Manually tested + a python test has been added.

Example code used for testing:

from pyspark.sql.functions import col, pandas_udf
import time

@pandas_udf("long")
def test_pandas(col1):
  time.sleep(0.02)
  return col1 * col1

spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
spark.sql("select max(test_pandas(col1)) from t1").collect()

This is used to test with more data pushed to the Python workers

from pyspark.sql.functions import col, pandas_udf
import time

@pandas_udf("long")
def test_pandas(col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17):
  time.sleep(0.02)
  return col1


spark.udf.register("test_pandas", test_pandas)
spark.sql("select rand(42)*rand(51)*rand(12) col1 from range(10000000)").createOrReplaceTempView("t1")
`spark.sql("select max(test_pandas(col1,col1+1,col1+2,col1+3,col1+4,col1+5,col1+6,col1+7,col1+8,col1+9,col1+10,col1+11,col1+12,col1+13,col1+14,col1+15,col1+16)) from t1").collect()

@maropu
Copy link
Member

maropu commented Jan 28, 2021

ok to test

@maropu
Copy link
Member

maropu commented Jan 28, 2021

cc: @HyukjinKwon @ueshin

@SparkQA
Copy link

SparkQA commented Jan 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39194/

@SparkQA
Copy link

SparkQA commented Jan 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39194/

@HyukjinKwon
Copy link
Member

cc @BryanCutler too

@SparkQA
Copy link

SparkQA commented Jan 28, 2021

Test build #134606 has finished for PR 31367 at commit 22e9999.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait PythonSQLMetrics extends SparkPlan

@SparkQA
Copy link

SparkQA commented Jan 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39207/

@SparkQA
Copy link

SparkQA commented Jan 28, 2021

Test build #134619 has finished for PR 31367 at commit c6a1bdc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39207/

Comment on lines +101 to +121
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really calculating the exec time? Seems like it's only calculating the time for fromJava(result)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this is just a component, actually small and could be omitted. The most important part of the execution for BatchEvalPythonExec time is measured in PythonUDFRunner.

Comment on lines +81 to +85
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here? Seems like it's just calculating the batch loading time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. From what I can see by experimenting, is that this actually triggers execution, so the time measured here seems to be a relevant value for the execution time. Ideally we would like to measure on the Python side, although this seems good enough?

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding new metrics for python-related plan nodes looks useful (I think we need to discuss which metrics to include and which metrics to not include though)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests in a Scala side (e.g., SQLMetricsSuite), too?

Copy link
Contributor Author

@LucaCanali LucaCanali Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I am now sure on how to do a Scala side test for Python UDF though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to implement many metrics, how about using Map[String, SQLMetric] instead? (see ShuffledRowRDD)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original thought here was to use a trait so that we can reuse the metrics definitaion with the many physical plans that need it (BatchEvalPythonExec, ArrowEcalPythonExec, AggregateInPandasExec, etc). Other implementations of this can work too. I see that ShuffleRowRDD uses a custom object SQLShuffleMetricsReporter to define and handle the metrics. Can you please elaborate a bit more on your proposal?

@maropu
Copy link
Member

maropu commented Jan 29, 2021

Could you list up the new metrics (their names and description) that this PR intends to add in the What changes were proposed in this pull request? section?

@LucaCanali
Copy link
Contributor Author

LucaCanali commented Jan 29, 2021

I have updated PR with the proposed list of metrics. The list currently contains 10 metrics.
I can see the need to have just a few important metrics.
However, I can also see that the Python UDF implementation is complex with many moving parts, metrics can help with troubleshooting with corner cases too, in those circumstances more metrics mean more flexibility and more chances to find where the root cause of the problem is.

Another point for discussion is how accurate are the metrics in the current implementation? I have run a few tests to check that the values measured make sense and are in the ballpark of what was expected. There is room to do more tests with some corner cases to understand how the instrumentation copes there.

In particular, measuring execution time can be challenging at times, as with this we attempt to do all measuremetns from JVM/Scala code. I am aiming to do a “good enough to be useful” job for the timing metrics, rather than a precise timing. I have put in the metrics description some hints of the nuances I found when testing.

I think the "time spent sending data" can be useful when troubleshooting cases where the performance problem is with sending large amounts of data to Python. Time spent executing is the key metric to understand the overall performance. Number of rows returned and processed are also useful metric, to understand how much work has been done and how much still needs to be done when monitoring the progress of an active query.

@maropu
Copy link
Member

maropu commented Jan 29, 2021

I have updated PR with the proposed list of metrics. The list currently contains 10 metrics.

Could you use texts to list up the metrics instead of the image? That's because the PR description will appear in the commit logs after this PR merged and the image data makes it hard to search commit logs, I think.

@SparkQA
Copy link

SparkQA commented Jan 29, 2021

Test build #134659 has started for PR 31367 at commit ee24c5c.

@SparkQA
Copy link

SparkQA commented Jan 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39247/

@SparkQA
Copy link

SparkQA commented Jan 29, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39247/

@SparkQA
Copy link

SparkQA commented Jan 29, 2021

Test build #134660 has finished for PR 31367 at commit c16968f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39253/

@SparkQA
Copy link

SparkQA commented Jan 29, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39253/

@SparkQA
Copy link

SparkQA commented Jan 29, 2021

Test build #134666 has finished for PR 31367 at commit 0932597.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LucaCanali LucaCanali changed the title [SPARK-34265][PYTHON][SQL] Instrument Python UDF using SQL Metrics [SPARK-34265][PYTHON][SQL] Instrument Python UDF execution using SQL Metrics Feb 4, 2021
@SparkQA
Copy link

SparkQA commented Feb 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39927/

@SparkQA
Copy link

SparkQA commented Feb 22, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39927/

@SparkQA
Copy link

SparkQA commented Feb 22, 2021

Test build #135347 has finished for PR 31367 at commit 8e8fee8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41435/

@SparkQA
Copy link

SparkQA commented Apr 2, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41435/

@SparkQA
Copy link

SparkQA commented Apr 2, 2021

Test build #136857 has finished for PR 31367 at commit e9db06c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43320/

@SparkQA
Copy link

SparkQA commented May 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43320/

@SparkQA
Copy link

SparkQA commented May 21, 2021

Test build #138798 has finished for PR 31367 at commit 9734f9c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 22, 2021

Test build #141504 has started for PR 31367 at commit 9690426.

@SparkQA
Copy link

SparkQA commented Jul 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46021/

@SparkQA
Copy link

SparkQA commented Jul 22, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46021/

@LucaCanali
Copy link
Contributor Author

This has been blocked for a while. I believe that from the discussion it has emerged:

  • the proposed implementation in this PR possible has too many metrics, which could be confusing
  • the timing metrics are hard to implement and to understand if the proposed implementation is correct/useful

I propose to take some of the ideas in the PR to a different and less complex PR: #33559

@LucaCanali LucaCanali closed this Jul 28, 2021
@LucaCanali LucaCanali changed the title [SPARK-34265][PYTHON][SQL] Instrument Python UDF execution using SQL Metrics [SPARK-34265][WIP][PYTHON][SQL] Instrument Python UDF execution using SQL Metrics Aug 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants