Skip to content

Commit 0bc2c69

Browse files
committed
done
1 parent 9201ec5 commit 0bc2c69

File tree

5 files changed

+14
-17
lines changed

5 files changed

+14
-17
lines changed

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import scala.util.control.NonFatal
2626

2727
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
2828
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
29-
import com.fasterxml.jackson.databind.node.ObjectNode
3029
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
3130
import org.json4s._
3231
import org.json4s.JsonAST.JValue
@@ -204,11 +203,7 @@ private[spark] object StreamingQueryProgress {
204203
}
205204

206205
private[spark] def jsonString(progress: StreamingQueryProgress): String = {
207-
val jsonNode = mapper.valueToTree[ObjectNode](StreamingQueryProgress)
208-
jsonNode.put("numInputRows", progress.numInputRows)
209-
jsonNode.put("inputRowsPerSecond", progress.inputRowsPerSecond)
210-
jsonNode.put("processedRowsPerSecond", progress.processedRowsPerSecond)
211-
mapper.writeValueAsString(progress)
206+
progress.json
212207
}
213208

214209
private[spark] def fromJson(json: String): StreamingQueryProgress =

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3259,10 +3259,6 @@ class SparkConnectPlanner(
32593259
case StreamingQueryCommand.CommandCase.LAST_PROGRESS |
32603260
StreamingQueryCommand.CommandCase.RECENT_PROGRESS =>
32613261
val progressReports = if (command.getLastProgress) {
3262-
// scalastyle:off
3263-
println("wei== sparkconnnectplanner lasprogress" + query.lastProgress)
3264-
println("wei== sparkconnnectplanner jsonString" + StreamingQueryProgress.jsonString(query.lastProgress))
3265-
// scalastyle:on
32663262
Option(query.lastProgress).toSeq
32673263
} else {
32683264
query.recentProgress.toImmutableArraySeq

python/pyspark/sql/tests/streaming/test_streaming.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
class StreamingTestsMixin:
3030
def test_streaming_query_functions_basic(self):
31-
df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load()
31+
df = self.spark.readStream.format("text").load("python/test_support/sql/streaming")
3232
query = (
3333
df.writeStream.format("memory")
3434
.queryName("test_streaming_query_functions_basic")

python/pyspark/sql/tests/streaming/test_streaming_listener.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,17 @@ def check_sink_progress(self, progress):
194194
self.assertTrue(isinstance(progress.numOutputRows, int))
195195
self.assertTrue(isinstance(progress.metrics, dict))
196196

197+
# def test_streaming_last_progress(self):
198+
# try:
199+
# df = self.spark.readStream.format("text").load("python/test_support/sql/streaming")
200+
# query = df.writeStream.format("noop").queryName("test_streaming_progress").start()
201+
# query.processAllAvailable()
202+
#
203+
# progress = StreamingQueryProgress.fromJson(query.lastProgress)
204+
# self.check_streaming_query_progress(progress, False)
205+
# finally:
206+
# query.stop()
207+
197208
# This is a generic test work for both classic Spark and Spark Connect
198209
def test_listener_observed_metrics(self):
199210
class MyErrorListener(StreamingQueryListener):

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import scala.util.control.NonFatal
2626

2727
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
2828
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
29-
import com.fasterxml.jackson.databind.node.ObjectNode
3029
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
3130
import org.json4s._
3231
import org.json4s.JsonAST.JValue
@@ -188,11 +187,7 @@ private[spark] object StreamingQueryProgress {
188187
}
189188

190189
private[spark] def jsonString(progress: StreamingQueryProgress): String = {
191-
val jsonNode = mapper.valueToTree[ObjectNode](StreamingQueryProgress)
192-
jsonNode.put("numInputRows", progress.numInputRows)
193-
jsonNode.put("inputRowsPerSecond", progress.inputRowsPerSecond)
194-
jsonNode.put("processedRowsPerSecond", progress.processedRowsPerSecond)
195-
mapper.writeValueAsString(progress)
190+
progress.json
196191
}
197192

198193
private[spark] def fromJson(json: String): StreamingQueryProgress =

0 commit comments

Comments
 (0)