Skip to content

Commit 22eb6c4

Browse files
WweiLHyukjinKwon
authored andcommitted
[SPARK-48567][SS][FOLLOWUP] StreamingQuery.lastProgress should return the actual StreamingQueryProgress
This reverts commit d067fc6, which reverted 042804a, essentially brings it back. 042804a failed the 3.5 client <> 4.0 server test, but the test was decided to turned off for cross-version test in #47468 ### What changes were proposed in this pull request? This PR is created after discussion in this closed one: #46886 I was trying to fix a bug (in connect, query.lastProgress doesn't have `numInputRows`, `inputRowsPerSecond`, and `processedRowsPerSecond`), and we reached the conclusion that what purposed in this PR should be the ultimate fix. In python, for both classic spark and spark connect, the return type of `lastProgress` is `Dict` (and `recentProgress` is `List[Dict]`), but in scala it's the actual `StreamingQueryProgress` object: https://github.com/apache/spark/blob/1a5d22aa2ffe769435be4aa6102ef961c55b9593/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala#L94-L101 This API discrepancy brings some confusion, like in Scala, users can do `query.lastProgress.batchId`, while in Python they have to do `query.lastProgress["batchId"]`. This PR makes `StreamingQuery.lastProgress` to return the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` to return `List[StreamingQueryProgress]`). To prevent breaking change, we extend `StreamingQueryProgress` to be a subclass of `dict`, so existing code accessing using dictionary method (e.g. `query.lastProgress["id"]`) is still functional. ### Why are the changes needed? API parity ### Does this PR introduce _any_ user-facing change? Yes, now `StreamingQuery.lastProgress` returns the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` returns `List[StreamingQueryProgress]`). ### How was this patch tested? Added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #47470 from WweiL/bring-back-lastProgress. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 239d77b commit 22eb6c4

File tree

5 files changed

+227
-99
lines changed

5 files changed

+227
-99
lines changed

python/pyspark/sql/connect/streaming/query.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
QueryProgressEvent,
3434
QueryIdleEvent,
3535
QueryTerminatedEvent,
36+
StreamingQueryProgress,
3637
)
3738
from pyspark.sql.streaming.query import (
3839
StreamingQuery as PySparkStreamingQuery,
@@ -110,21 +111,21 @@ def status(self) -> Dict[str, Any]:
110111
status.__doc__ = PySparkStreamingQuery.status.__doc__
111112

112113
@property
113-
def recentProgress(self) -> List[Dict[str, Any]]:
114+
def recentProgress(self) -> List[StreamingQueryProgress]:
114115
cmd = pb2.StreamingQueryCommand()
115116
cmd.recent_progress = True
116117
progress = self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
117-
return [json.loads(p) for p in progress]
118+
return [StreamingQueryProgress.fromJson(json.loads(p)) for p in progress]
118119

119120
recentProgress.__doc__ = PySparkStreamingQuery.recentProgress.__doc__
120121

121122
@property
122-
def lastProgress(self) -> Optional[Dict[str, Any]]:
123+
def lastProgress(self) -> Optional[StreamingQueryProgress]:
123124
cmd = pb2.StreamingQueryCommand()
124125
cmd.last_progress = True
125126
progress = self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
126127
if len(progress) > 0:
127-
return json.loads(progress[-1])
128+
return StreamingQueryProgress.fromJson(json.loads(progress[-1]))
128129
else:
129130
return None
130131

0 commit comments

Comments
 (0)