-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-46124][FOLLOWUP][CONNECT][SS] Send missing fields in StreamingQueryProgress to client #46886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @HeartSaVioR @HyukjinKwon @LuciferYang, can you guys take a look? Thank you! |
| class StreamingTestsMixin: | ||
| def test_streaming_query_functions_basic(self): | ||
| df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load() | ||
| df = self.spark.readStream.format("text").load("python/test_support/sql/streaming") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just a minor fix it will make the test more stable. We call query.processAllAvailable below, which has a very small chance of being a indefinite blocking call on rate source.
|
Moreover, the test failures in ClientStreamingQuerySuite may also be related to the current PR, seems Line 108 in ab00533
Does this mean that the new changes are not fully compatible with previous behaviors? @WweiL @HyukjinKwon |
|
Ah thanks, I only tested the new test case, glad existing tests catch them, let me fix! @LuciferYang |
|
Now I actually think we should fix python client side reconstruction |
|
That three fields are just constructed using other existing info: spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala Lines 145 to 152 in 0bc2c69
There is no absolute reason we also send them from the server. I can just force the client to reconstruct these fields as in the scala definition. But there is always this API difference, in python, the return type of spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala Lines 94 to 101 in 1a5d22a
I feel that we could let the python client also return that 'StreamingQueryProgress' object, and put the reconstruct logic inside the client side StreamingQueryProgress, what do you guys think? |
|
We should have done that. I wonder if we can have a class that implements |
…l StreamingQueryProgress ### What changes were proposed in this pull request? This PR is created after discussion in this closed one: #46886 I was trying to fix a bug (in connect, query.lastProgress doesn't have `numInputRows`, `inputRowsPerSecond`, and `processedRowsPerSecond`), and we reached the conclusion that what purposed in this PR should be the ultimate fix. In python, for both classic spark and spark connect, the return type of `lastProgress` is `Dict` (and `recentProgress` is `List[Dict]`), but in scala it's the actual `StreamingQueryProgress` object: https://github.com/apache/spark/blob/1a5d22aa2ffe769435be4aa6102ef961c55b9593/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala#L94-L101 This API discrepancy brings some confusion, like in Scala, users can do `query.lastProgress.batchId`, while in Python they have to do `query.lastProgress["batchId"]`. This PR makes `StreamingQuery.lastProgress` to return the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` to return `List[StreamingQueryProgress]`). To prevent breaking change, we extend `StreamingQueryProgress` to be a subclass of `dict`, so existing code accessing using dictionary method (e.g. `query.lastProgress["id"]`) is still functional. ### Why are the changes needed? API parity ### Does this PR introduce _any_ user-facing change? Yes, now `StreamingQuery.lastProgress` returns the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` returns `List[StreamingQueryProgress]`). ### How was this patch tested? Added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #46921 from WweiL/SPARK-48567-lastProgress. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
… the actual StreamingQueryProgress This reverts commit d067fc6, which reverted 042804a, essentially brings it back. 042804a failed the 3.5 client <> 4.0 server test, but the test was decided to turned off for cross-version test in #47468 ### What changes were proposed in this pull request? This PR is created after discussion in this closed one: #46886 I was trying to fix a bug (in connect, query.lastProgress doesn't have `numInputRows`, `inputRowsPerSecond`, and `processedRowsPerSecond`), and we reached the conclusion that what purposed in this PR should be the ultimate fix. In python, for both classic spark and spark connect, the return type of `lastProgress` is `Dict` (and `recentProgress` is `List[Dict]`), but in scala it's the actual `StreamingQueryProgress` object: https://github.com/apache/spark/blob/1a5d22aa2ffe769435be4aa6102ef961c55b9593/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala#L94-L101 This API discrepancy brings some confusion, like in Scala, users can do `query.lastProgress.batchId`, while in Python they have to do `query.lastProgress["batchId"]`. This PR makes `StreamingQuery.lastProgress` to return the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` to return `List[StreamingQueryProgress]`). To prevent breaking change, we extend `StreamingQueryProgress` to be a subclass of `dict`, so existing code accessing using dictionary method (e.g. `query.lastProgress["id"]`) is still functional. ### Why are the changes needed? API parity ### Does this PR introduce _any_ user-facing change? Yes, now `StreamingQuery.lastProgress` returns the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` returns `List[StreamingQueryProgress]`). ### How was this patch tested? Added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #47470 from WweiL/bring-back-lastProgress. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
… the actual StreamingQueryProgress This reverts commit d067fc6, which reverted 042804a, essentially brings it back. 042804a failed the 3.5 client <> 4.0 server test, but the test was decided to turned off for cross-version test in apache#47468 ### What changes were proposed in this pull request? This PR is created after discussion in this closed one: apache#46886 I was trying to fix a bug (in connect, query.lastProgress doesn't have `numInputRows`, `inputRowsPerSecond`, and `processedRowsPerSecond`), and we reached the conclusion that what purposed in this PR should be the ultimate fix. In python, for both classic spark and spark connect, the return type of `lastProgress` is `Dict` (and `recentProgress` is `List[Dict]`), but in scala it's the actual `StreamingQueryProgress` object: https://github.com/apache/spark/blob/1a5d22aa2ffe769435be4aa6102ef961c55b9593/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala#L94-L101 This API discrepancy brings some confusion, like in Scala, users can do `query.lastProgress.batchId`, while in Python they have to do `query.lastProgress["batchId"]`. This PR makes `StreamingQuery.lastProgress` to return the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` to return `List[StreamingQueryProgress]`). To prevent breaking change, we extend `StreamingQueryProgress` to be a subclass of `dict`, so existing code accessing using dictionary method (e.g. `query.lastProgress["id"]`) is still functional. ### Why are the changes needed? API parity ### Does this PR introduce _any_ user-facing change? Yes, now `StreamingQuery.lastProgress` returns the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` returns `List[StreamingQueryProgress]`). ### How was this patch tested? Added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47470 from WweiL/bring-back-lastProgress. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>


What changes were proposed in this pull request?
Currently in PySpark Client, calling
query.lastProgresswon't return you three fields:spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
Lines 145 to 152 in 0bc2c69
This is because these three fields aren't native fields and are supposed to be reconstructed when rebuilding the client side
StreamingQueryProgress. But this brings a bug only in Python Spark Connect.This is not an issue for scala connect client, because the client side reconstruct the StreamingQueryProgress using the same way it's constructed in the server side.
The problem with python is the return type is a dict, so there is no such reconstruction, and therefore these fields are not there.
spark/python/pyspark/sql/connect/streaming/query.py
Line 122 in d6fcba0
This is not an issue in classic pyspark also, because classic pyspark calls the
jsonValueofStreamingQueryProgress, which has all fields defined:spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
Lines 162 to 178 in 0bc2c69
In this PR we send all such fields to the client to fix this issue. This won't cause backward compatibility issue because that field is a map anyways.
Why are the changes needed?
Bug fix
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit test, passed locally. Before the fix, the test would fail with:
Was this patch authored or co-authored using generative AI tooling?
No