Skip to content

Commit 230607d

Browse files
committed
[SPARK-19140][SS] Allow update mode for non-aggregation streaming queries
## What changes were proposed in this pull request? This PR allow update mode for non-aggregation streaming queries. It will be same as the append mode if a query has no aggregations. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16520 from zsxwing/update-without-agg. (cherry picked from commit bc6c56e) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
1 parent 81c9430 commit 230607d

File tree

8 files changed

+72
-52
lines changed

8 files changed

+72
-52
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou
374374

375375
- *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
376376

377-
- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger.
377+
- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode.
378378

379379
Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes).
380380

@@ -977,7 +977,7 @@ Here is the compatibility matrix.
977977
</tr>
978978
<tr>
979979
<td colspan="2" style="vertical-align: middle;">Queries without aggregation</td>
980-
<td style="vertical-align: middle;">Append</td>
980+
<td style="vertical-align: middle;">Append, Update</td>
981981
<td style="vertical-align: middle;">
982982
Complete mode not supported as it is infeasible to keep all data in the Result Table.
983983
</td>

python/pyspark/sql/streaming.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,9 @@ def outputMode(self, outputMode):
665665
the sink
666666
* `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
667667
every time these is some updates
668+
* `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
669+
written to the sink every time there are some updates. If the query doesn't contain
670+
aggregations, it will be equivalent to `append` mode.
668671
669672
.. note:: Experimental.
670673
@@ -768,7 +771,8 @@ def trigger(self, processingTime=None):
768771

769772
@ignore_unicode_prefix
770773
@since(2.0)
771-
def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
774+
def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None,
775+
**options):
772776
"""Streams the contents of the :class:`DataFrame` to a data source.
773777
774778
The data source is specified by the ``format`` and a set of ``options``.
@@ -779,15 +783,20 @@ def start(self, path=None, format=None, partitionBy=None, queryName=None, **opti
779783
780784
:param path: the path in a Hadoop supported file system
781785
:param format: the format used to save
782-
783-
* ``append``: Append contents of this :class:`DataFrame` to existing data.
784-
* ``overwrite``: Overwrite existing data.
785-
* ``ignore``: Silently ignore this operation if data already exists.
786-
* ``error`` (default case): Throw an exception if data already exists.
786+
:param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a
787+
streaming sink.
788+
789+
* `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the
790+
sink
791+
* `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
792+
every time these is some updates
793+
* `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
794+
written to the sink every time there are some updates. If the query doesn't contain
795+
aggregations, it will be equivalent to `append` mode.
787796
:param partitionBy: names of partitioning columns
788797
:param queryName: unique name for the query
789798
:param options: All other string options. You may want to provide a `checkpointLocation`
790-
for most streams, however it is not required for a `memory` stream.
799+
for most streams, however it is not required for a `memory` stream.
791800
792801
>>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
793802
>>> sq.isActive
@@ -798,14 +807,16 @@ def start(self, path=None, format=None, partitionBy=None, queryName=None, **opti
798807
>>> sq.isActive
799808
False
800809
>>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
801-
... queryName='that_query', format='memory')
810+
... queryName='that_query', outputMode="append", format='memory')
802811
>>> sq.name
803812
u'that_query'
804813
>>> sq.isActive
805814
True
806815
>>> sq.stop()
807816
"""
808817
self.options(**options)
818+
if outputMode is not None:
819+
self.outputMode(outputMode)
809820
if partitionBy is not None:
810821
self.partitionBy(partitionBy)
811822
if format is not None:

sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public static OutputMode Complete() {
5757

5858
/**
5959
* OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will
60-
* be written to the sink every time there are some updates.
60+
* be written to the sink every time there are some updates. If the query doesn't contain
61+
* aggregations, it will be equivalent to `Append` mode.
6162
*
6263
* @since 2.1.1
6364
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object UnsupportedOperationChecker {
7373
s"streaming DataFrames/DataSets")(plan)
7474
}
7575

76-
case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
76+
case InternalOutputModes.Complete if aggregates.isEmpty =>
7777
throwError(
7878
s"$outputMode output mode not supported when there are no streaming aggregations on " +
7979
s"streaming DataFrames/Datasets")(plan)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ private[sql] object InternalOutputModes {
4040

4141
/**
4242
* OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
43-
* written to the sink every time these is some updates. This output mode can only be used in
44-
* queries that contain aggregations.
43+
* written to the sink every time these is some updates. If the query doesn't contain
44+
* aggregations, it will be equivalent to `Append` mode.
4545
*/
4646
case object Update extends OutputMode
4747
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
219219
"window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")
220220

221221
// Output modes with aggregation and non-aggregation plans
222-
testOutputMode(Append, shouldSupportAggregation = false)
223-
testOutputMode(Update, shouldSupportAggregation = true)
224-
testOutputMode(Complete, shouldSupportAggregation = true)
222+
testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true)
223+
testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true)
224+
testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false)
225225

226226
/*
227227
=======================================================================================
@@ -323,30 +323,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
323323
/** Test output mode with and without aggregation in the streaming plan */
324324
def testOutputMode(
325325
outputMode: OutputMode,
326-
shouldSupportAggregation: Boolean): Unit = {
326+
shouldSupportAggregation: Boolean,
327+
shouldSupportNonAggregation: Boolean): Unit = {
327328

328329
// aggregation
329330
if (shouldSupportAggregation) {
330-
assertNotSupportedInStreamingPlan(
331-
s"$outputMode output mode - no aggregation",
332-
streamRelation.where($"a" > 1),
333-
outputMode = outputMode,
334-
Seq("aggregation", s"$outputMode output mode"))
335-
336331
assertSupportedInStreamingPlan(
337332
s"$outputMode output mode - aggregation",
338333
streamRelation.groupBy("a")("count(*)"),
339334
outputMode = outputMode)
340-
341335
} else {
336+
assertNotSupportedInStreamingPlan(
337+
s"$outputMode output mode - aggregation",
338+
streamRelation.groupBy("a")("count(*)"),
339+
outputMode = outputMode,
340+
Seq("aggregation", s"$outputMode output mode"))
341+
}
342+
343+
// non aggregation
344+
if (shouldSupportNonAggregation) {
342345
assertSupportedInStreamingPlan(
343346
s"$outputMode output mode - no aggregation",
344347
streamRelation.where($"a" > 1),
345348
outputMode = outputMode)
346-
349+
} else {
347350
assertNotSupportedInStreamingPlan(
348-
s"$outputMode output mode - aggregation",
349-
streamRelation.groupBy("a")("count(*)"),
351+
s"$outputMode output mode - no aggregation",
352+
streamRelation.where($"a" > 1),
350353
outputMode = outputMode,
351354
Seq("aggregation", s"$outputMode output mode"))
352355
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
4444
* written to the sink
4545
* - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
4646
* to the sink every time these is some updates
47+
* - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset
48+
* will be written to the sink every time there are some updates. If
49+
* the query doesn't contain aggregations, it will be equivalent to
50+
* `OutputMode.Append()` mode.
4751
*
4852
* @since 2.0.0
4953
*/
@@ -58,7 +62,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
5862
* the sink
5963
* - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
6064
* every time these is some updates
61-
*
65+
* - `update`: only the rows that were updated in the streaming DataFrame/Dataset will
66+
* be written to the sink every time there are some updates. If the query doesn't
67+
* contain aggregations, it will be equivalent to `append` mode.
6268
* @since 2.0.0
6369
*/
6470
def outputMode(outputMode: String): DataStreamWriter[T] = {
@@ -220,16 +226,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
220226
if (extraOptions.get("queryName").isEmpty) {
221227
throw new AnalysisException("queryName must be specified for memory sink")
222228
}
223-
val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'."
224-
outputMode match {
225-
case Append | Complete => // allowed
226-
case Update =>
227-
throw new AnalysisException(
228-
s"Update output mode is not supported for memory sink. $supportedModes")
229-
case _ =>
230-
throw new AnalysisException(
231-
s"$outputMode is not supported for memory sink. $supportedModes")
232-
}
233229
val sink = new MemorySink(df.schema, outputMode)
234230
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
235231
val chkpointLoc = extraOptions.get("checkpointLocation")

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
137137
}
138138

139139

140-
test("registering as a table in Append output mode - supported") {
140+
test("registering as a table in Append output mode") {
141141
val input = MemoryStream[Int]
142142
val query = input.toDF().writeStream
143143
.format("memory")
@@ -160,7 +160,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
160160
query.stop()
161161
}
162162

163-
test("registering as a table in Complete output mode - supported") {
163+
test("registering as a table in Complete output mode") {
164164
val input = MemoryStream[Int]
165165
val query = input.toDF()
166166
.groupBy("value")
@@ -186,18 +186,27 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
186186
query.stop()
187187
}
188188

189-
test("registering as a table in Update output mode - not supported") {
189+
test("registering as a table in Update output mode") {
190190
val input = MemoryStream[Int]
191-
val df = input.toDF()
192-
.groupBy("value")
193-
.count()
194-
intercept[AnalysisException] {
195-
df.writeStream
196-
.format("memory")
197-
.outputMode("update")
198-
.queryName("memStream")
199-
.start()
200-
}
191+
val query = input.toDF().writeStream
192+
.format("memory")
193+
.outputMode("update")
194+
.queryName("memStream")
195+
.start()
196+
input.addData(1, 2, 3)
197+
query.processAllAvailable()
198+
199+
checkDataset(
200+
spark.table("memStream").as[Int],
201+
1, 2, 3)
202+
203+
input.addData(4, 5, 6)
204+
query.processAllAvailable()
205+
checkDataset(
206+
spark.table("memStream").as[Int],
207+
1, 2, 3, 4, 5, 6)
208+
209+
query.stop()
201210
}
202211

203212
test("MemoryPlan statistics") {

0 commit comments

Comments
 (0)