-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42034] QueryExecutionListener and Observation API do not work with foreach
/ reduce
/ foreachPartition
action.
#39976
Conversation
@@ -960,6 +960,19 @@ class DatasetSuite extends QueryTest | |||
observe(spark.range(1, 10, 1, 11), Map("percentile_approx_val" -> 5)) | |||
} | |||
|
|||
test("observation on datasets when a DataSet trigger foreach action") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test("observation on datasets when a DataSet trigger foreach action") { | |
test("SPARK-42034: observation on datasets when a DataSet trigger foreach action") { |
@@ -96,6 +96,34 @@ class DataFrameCallbackSuite extends QueryTest | |||
spark.listenerManager.unregister(listener) | |||
} | |||
|
|||
test("execute callback functions when a DataSet trigger foreach action finished") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test("execute callback functions when a DataSet trigger foreach action finished") { | |
test("SPARK-42034: execute callback functions when a DataSet trigger foreach action finished") { |
assert(metrics(0)._1 == "foreach") | ||
assert(metrics(1)._1 == "reduce") | ||
|
||
spark.listenerManager.unregister(listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add this into finally
so the test failure of this doesn't affect other tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know other tests don't. but let's at least do it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from my end. cc @hvanhovell and @beliefer FYI
@HyukjinKwon Thank you for ping me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too.
Merged to master. |
Thanks @HyukjinKwon @beliefer |
this wouldn't fix the |
Thanks. Made a followup: #43304 |
…eachPartition in JdbcUtils ### What changes were proposed in this pull request? This PR is kind of a followup for #39976 that addresses #39976 (comment) comment. ### Why are the changes needed? In order to probably assign the SQL execution ID so `df.observe` works with this. ### Does this PR introduce _any_ user-facing change? Yes. `df.observe` will work with JDBC connectors. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? Unit test was added. Closes #43304 from HyukjinKwon/foreachbatch. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…eachPartition in JdbcUtils This PR is kind of a followup for #39976 that addresses #39976 (comment) comment. In order to probably assign the SQL execution ID so `df.observe` works with this. Yes. `df.observe` will work with JDBC connectors. Manually tested. Unit test was added. Closes #43304 from HyukjinKwon/foreachbatch. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 39cc4ab) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…D.foreachPartition in JdbcUtils This PR cherry-picks #43304 to branch-3.5 --- ### What changes were proposed in this pull request? This PR is kind of a followup for #39976 that addresses #39976 (comment) comment. ### Why are the changes needed? In order to probably assign the SQL execution ID so `df.observe` works with this. ### Does this PR introduce _any_ user-facing change? Yes. `df.observe` will work with JDBC connectors. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? Unit test was added. Closes #43322 from HyukjinKwon/SPARK-45475-3.5. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
Add the name parameter for 'foreach'/'reduce'/'foreachPartition' operators in
DataSet#withNewRDDExecutionId
. Because the QueryExecutionListener and Observation API is triggered only when the operators have the name parameter.spark/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
Line 181 in 84ddd40
Why are the changes needed?
The QueryExecutionListener and Observation API is triggered only when the operators have the name parameter.
Does this PR introduce any user-facing change?
No
How was this patch tested?
add two unit test.