Skip to content

Conversation

@WweiL
Copy link
Contributor

@WweiL WweiL commented Jun 5, 2024

What changes were proposed in this pull request?

Currently in PySpark Client, calling query.lastProgress won't return you three fields:

/** The aggregate (across all sources) number of records processed in a trigger. */
def numInputRows: Long = sources.map(_.numInputRows).sum
/** The aggregate (across all sources) rate of data arriving. */
def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum
/** The aggregate (across all sources) rate at which Spark is processing data. */
def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum

This is because these three fields aren't native fields and are supposed to be reconstructed when rebuilding the client side StreamingQueryProgress. But this brings a bug only in Python Spark Connect.

This is not an issue for scala connect client, because the client side reconstruct the StreamingQueryProgress using the same way it's constructed in the server side.

The problem with python is the return type is a dict, so there is no such reconstruction, and therefore these fields are not there.

def lastProgress(self) -> Optional[Dict[str, Any]]:

This is not an issue in classic pyspark also, because classic pyspark calls the jsonValue of StreamingQueryProgress, which has all fields defined:

private[sql] def jsonValue: JValue = {
("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
("batchId" -> JInt(batchId)) ~
("batchDuration" -> JInt(batchDuration)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue) ~
("observedMetrics" -> safeMapToJValue[Row](observedMetrics, row => row.jsonValue))
}

In this PR we send all such fields to the client to fix this issue. This won't cause backward compatibility issue because that field is a map anyways.

Why are the changes needed?

Bug fix

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit test, passed locally. Before the fix, the test would fail with:

======================================================================
ERROR: test_streaming_last_progress (pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_streaming_last_progress)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/wei.liu/oss-spark/python/pyspark/sql/tests/streaming/test_streaming.py", line 68, in test_streaming_last_progress
    progress = StreamingQueryProgress.fromJson(query.lastProgress)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wei.liu/oss-spark/python/pyspark/sql/streaming/listener.py", line 489, in fromJson
    numInputRows=j["numInputRows"],
                 ~^^^^^^^^^^^^^^^^
KeyError: 'numInputRows'

Was this patch authored or co-authored using generative AI tooling?

No

@WweiL
Copy link
Contributor Author

WweiL commented Jun 5, 2024

cc @HeartSaVioR @HyukjinKwon @LuciferYang, can you guys take a look? Thank you!

@WweiL WweiL changed the title [SPARK-46124][FOLLOWUP][CONNECT][SS] Send all fields in StreamingQueryProgress [SPARK-46124][FOLLOWUP][CONNECT][SS] Send missing fields in StreamingQueryProgress to client Jun 5, 2024
class StreamingTestsMixin:
def test_streaming_query_functions_basic(self):
df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load()
df = self.spark.readStream.format("text").load("python/test_support/sql/streaming")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a minor fix it will make the test more stable. We call query.processAllAvailable below, which has a very small chance of being a indefinite blocking call on rate source.

@LuciferYang
Copy link
Contributor

In the original design, methods jsonString and fromJson are paired, so the test failure in StreamingQueryStatusAndProgressSuite should be related to the current pr:

image

@LuciferYang
Copy link
Contributor

LuciferYang commented Jun 6, 2024

Moreover, the test failures in ClientStreamingQuerySuite may also be related to the current PR, seems StreamingQueryProgress.observedMetrics is null with this pr(it's a empty Map without this pr)

image

Does this mean that the new changes are not fully compatible with previous behaviors? @WweiL @HyukjinKwon

@WweiL
Copy link
Contributor Author

WweiL commented Jun 6, 2024

Ah thanks, I only tested the new test case, glad existing tests catch them, let me fix! @LuciferYang

@WweiL
Copy link
Contributor Author

WweiL commented Jun 6, 2024

Now I actually think we should fix python client side reconstruction

@WweiL
Copy link
Contributor Author

WweiL commented Jun 6, 2024

That three fields are just constructed using other existing info:

/** The aggregate (across all sources) number of records processed in a trigger. */
def numInputRows: Long = sources.map(_.numInputRows).sum
/** The aggregate (across all sources) rate of data arriving. */
def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum
/** The aggregate (across all sources) rate at which Spark is processing data. */
def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum

There is no absolute reason we also send them from the server.

I can just force the client to reconstruct these fields as in the scala definition.

But there is always this API difference, in python, the return type of lastProgress is Dict, but in scala it's the actual StreamingQueryProgress object:

def recentProgress: Array[StreamingQueryProgress]
/**
* Returns the most recent [[StreamingQueryProgress]] update of this streaming query.
*
* @since 2.1.0
*/
def lastProgress: StreamingQueryProgress

I feel that we could let the python client also return that 'StreamingQueryProgress' object, and put the reconstruct logic inside the client side StreamingQueryProgress, what do you guys think?
@LuciferYang @HyukjinKwon @HeartSaVioR

@HyukjinKwon
Copy link
Member

We should have done that. I wonder if we can have a class that implements dict e.g., StreamingQueryProgress(dict) to avoid a breaking change.

@WweiL
Copy link
Contributor Author

WweiL commented Jun 8, 2024

Created another PR: #46921, it is not ready for review yet because I found another bug when working on it. After #46920 is merged, it will be ready for review

@WweiL WweiL closed this Jun 8, 2024
HyukjinKwon pushed a commit that referenced this pull request Jun 17, 2024
…l StreamingQueryProgress

### What changes were proposed in this pull request?

This PR is created after discussion in this closed one: #46886
I was trying to fix a bug (in connect, query.lastProgress doesn't have `numInputRows`, `inputRowsPerSecond`, and `processedRowsPerSecond`), and we reached the conclusion that what purposed in this PR should be the ultimate fix.

In python, for both classic spark and spark connect, the return type of `lastProgress` is `Dict` (and `recentProgress` is `List[Dict]`), but in scala it's the actual `StreamingQueryProgress` object:
https://github.com/apache/spark/blob/1a5d22aa2ffe769435be4aa6102ef961c55b9593/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala#L94-L101

This API discrepancy brings some confusion, like in Scala, users can do `query.lastProgress.batchId`, while in Python they have to do `query.lastProgress["batchId"]`.

This PR makes `StreamingQuery.lastProgress` to return the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` to return `List[StreamingQueryProgress]`).

To prevent breaking change, we extend `StreamingQueryProgress` to be a subclass of `dict`, so existing code accessing using dictionary method (e.g. `query.lastProgress["id"]`) is still functional.

### Why are the changes needed?

API parity

### Does this PR introduce _any_ user-facing change?

Yes, now `StreamingQuery.lastProgress` returns the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` returns `List[StreamingQueryProgress]`).

### How was this patch tested?

Added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46921 from WweiL/SPARK-48567-lastProgress.

Authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Jul 24, 2024
… the actual StreamingQueryProgress

This reverts commit d067fc6, which reverted 042804a, essentially brings it back. 042804a failed the 3.5 client <> 4.0 server test, but the test was decided to turned off for cross-version test in #47468

### What changes were proposed in this pull request?

This PR is created after discussion in this closed one: #46886
I was trying to fix a bug (in connect, query.lastProgress doesn't have `numInputRows`, `inputRowsPerSecond`, and `processedRowsPerSecond`), and we reached the conclusion that what purposed in this PR should be the ultimate fix.

In python, for both classic spark and spark connect, the return type of `lastProgress` is `Dict` (and `recentProgress` is `List[Dict]`), but in scala it's the actual `StreamingQueryProgress` object:
https://github.com/apache/spark/blob/1a5d22aa2ffe769435be4aa6102ef961c55b9593/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala#L94-L101

This API discrepancy brings some confusion, like in Scala, users can do `query.lastProgress.batchId`, while in Python they have to do `query.lastProgress["batchId"]`.

This PR makes `StreamingQuery.lastProgress` to return the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` to return `List[StreamingQueryProgress]`).

To prevent breaking change, we extend `StreamingQueryProgress` to be a subclass of `dict`, so existing code accessing using dictionary method (e.g. `query.lastProgress["id"]`) is still functional.

### Why are the changes needed?

API parity

### Does this PR introduce _any_ user-facing change?

Yes, now `StreamingQuery.lastProgress` returns the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` returns `List[StreamingQueryProgress]`).

### How was this patch tested?

Added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47470 from WweiL/bring-back-lastProgress.

Authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
ilicmarkodb pushed a commit to ilicmarkodb/spark that referenced this pull request Jul 29, 2024
… the actual StreamingQueryProgress

This reverts commit d067fc6, which reverted 042804a, essentially brings it back. 042804a failed the 3.5 client <> 4.0 server test, but the test was decided to turned off for cross-version test in apache#47468

### What changes were proposed in this pull request?

This PR is created after discussion in this closed one: apache#46886
I was trying to fix a bug (in connect, query.lastProgress doesn't have `numInputRows`, `inputRowsPerSecond`, and `processedRowsPerSecond`), and we reached the conclusion that what purposed in this PR should be the ultimate fix.

In python, for both classic spark and spark connect, the return type of `lastProgress` is `Dict` (and `recentProgress` is `List[Dict]`), but in scala it's the actual `StreamingQueryProgress` object:
https://github.com/apache/spark/blob/1a5d22aa2ffe769435be4aa6102ef961c55b9593/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala#L94-L101

This API discrepancy brings some confusion, like in Scala, users can do `query.lastProgress.batchId`, while in Python they have to do `query.lastProgress["batchId"]`.

This PR makes `StreamingQuery.lastProgress` to return the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` to return `List[StreamingQueryProgress]`).

To prevent breaking change, we extend `StreamingQueryProgress` to be a subclass of `dict`, so existing code accessing using dictionary method (e.g. `query.lastProgress["id"]`) is still functional.

### Why are the changes needed?

API parity

### Does this PR introduce _any_ user-facing change?

Yes, now `StreamingQuery.lastProgress` returns the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` returns `List[StreamingQueryProgress]`).

### How was this patch tested?

Added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47470 from WweiL/bring-back-lastProgress.

Authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants