Skip to content

[SPARK-19140][SS]Allow update mode for non-aggregation streaming queries #16520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou

- *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger.
- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode.

Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes).

Expand Down Expand Up @@ -977,7 +977,7 @@ Here is the compatibility matrix.
</tr>
<tr>
<td colspan="2" style="vertical-align: middle;">Queries without aggregation</td>
<td style="vertical-align: middle;">Append</td>
<td style="vertical-align: middle;">Append, Update</td>
<td style="vertical-align: middle;">
Complete mode not supported as it is infeasible to keep all data in the Result Table.
</td>
Expand Down
27 changes: 19 additions & 8 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,9 @@ def outputMode(self, outputMode):
the sink
* `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
every time these is some updates
* `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
written to the sink every time there are some updates. If the query doesn't contain
aggregations, it will be equivalent to `append` mode.

.. note:: Experimental.

Expand Down Expand Up @@ -768,7 +771,8 @@ def trigger(self, processingTime=None):

@ignore_unicode_prefix
@since(2.0)
def start(self, path=None, format=None, partitionBy=None, queryName=None, **options):
def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None,
**options):
"""Streams the contents of the :class:`DataFrame` to a data source.

The data source is specified by the ``format`` and a set of ``options``.
Expand All @@ -779,15 +783,20 @@ def start(self, path=None, format=None, partitionBy=None, queryName=None, **opti

:param path: the path in a Hadoop supported file system
:param format: the format used to save

* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a
streaming sink.

* `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the
sink
* `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink
every time these is some updates
* `update`:only the rows that were updated in the streaming DataFrame/Dataset will be
written to the sink every time there are some updates. If the query doesn't contain
aggregations, it will be equivalent to `append` mode.
:param partitionBy: names of partitioning columns
:param queryName: unique name for the query
:param options: All other string options. You may want to provide a `checkpointLocation`
for most streams, however it is not required for a `memory` stream.
for most streams, however it is not required for a `memory` stream.

>>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
>>> sq.isActive
Expand All @@ -798,14 +807,16 @@ def start(self, path=None, format=None, partitionBy=None, queryName=None, **opti
>>> sq.isActive
False
>>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
... queryName='that_query', format='memory')
... queryName='that_query', outputMode="append", format='memory')
>>> sq.name
u'that_query'
>>> sq.isActive
True
>>> sq.stop()
"""
self.options(**options)
if outputMode is not None:
self.outputMode(outputMode)
if partitionBy is not None:
self.partitionBy(partitionBy)
if format is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public static OutputMode Complete() {

/**
* OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will
* be written to the sink every time there are some updates.
* be written to the sink every time there are some updates. If the query doesn't contain
* aggregations, it will be equivalent to `Append` mode.
*
* @since 2.1.1
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object UnsupportedOperationChecker {
s"streaming DataFrames/DataSets")(plan)
}

case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
case InternalOutputModes.Complete if aggregates.isEmpty =>
throwError(
s"$outputMode output mode not supported when there are no streaming aggregations on " +
s"streaming DataFrames/Datasets")(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ private[sql] object InternalOutputModes {

/**
* OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be
* written to the sink every time these is some updates. This output mode can only be used in
* queries that contain aggregations.
* written to the sink every time these is some updates. If the query doesn't contain
* aggregations, it will be equivalent to `Append` mode.
*/
case object Update extends OutputMode
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
"window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows")

// Output modes with aggregation and non-aggregation plans
testOutputMode(Append, shouldSupportAggregation = false)
testOutputMode(Update, shouldSupportAggregation = true)
testOutputMode(Complete, shouldSupportAggregation = true)
testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true)
testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true)
testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false)

/*
=======================================================================================
Expand Down Expand Up @@ -323,30 +323,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
/** Test output mode with and without aggregation in the streaming plan */
def testOutputMode(
outputMode: OutputMode,
shouldSupportAggregation: Boolean): Unit = {
shouldSupportAggregation: Boolean,
shouldSupportNonAggregation: Boolean): Unit = {

// aggregation
if (shouldSupportAggregation) {
assertNotSupportedInStreamingPlan(
s"$outputMode output mode - no aggregation",
streamRelation.where($"a" > 1),
outputMode = outputMode,
Seq("aggregation", s"$outputMode output mode"))

assertSupportedInStreamingPlan(
s"$outputMode output mode - aggregation",
streamRelation.groupBy("a")("count(*)"),
outputMode = outputMode)

} else {
assertNotSupportedInStreamingPlan(
s"$outputMode output mode - aggregation",
streamRelation.groupBy("a")("count(*)"),
outputMode = outputMode,
Seq("aggregation", s"$outputMode output mode"))
}

// non aggregation
if (shouldSupportNonAggregation) {
assertSupportedInStreamingPlan(
s"$outputMode output mode - no aggregation",
streamRelation.where($"a" > 1),
outputMode = outputMode)

} else {
assertNotSupportedInStreamingPlan(
s"$outputMode output mode - aggregation",
streamRelation.groupBy("a")("count(*)"),
s"$outputMode output mode - no aggregation",
streamRelation.where($"a" > 1),
outputMode = outputMode,
Seq("aggregation", s"$outputMode output mode"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* written to the sink
* - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
* to the sink every time these is some updates
* - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset
* will be written to the sink every time there are some updates. If
* the query doesn't contain aggregations, it will be equivalent to
* `OutputMode.Append()` mode.
*
* @since 2.0.0
*/
Expand All @@ -58,7 +62,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* the sink
* - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
* every time these is some updates
*
* - `update`: only the rows that were updated in the streaming DataFrame/Dataset will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also please update pyspark docs?

* be written to the sink every time there are some updates. If the query doesn't
* contain aggregations, it will be equivalent to `append` mode.
* @since 2.0.0
*/
def outputMode(outputMode: String): DataStreamWriter[T] = {
Expand Down Expand Up @@ -220,16 +226,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
if (extraOptions.get("queryName").isEmpty) {
throw new AnalysisException("queryName must be specified for memory sink")
}
val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'."
outputMode match {
case Append | Complete => // allowed
case Update =>
throw new AnalysisException(
s"Update output mode is not supported for memory sink. $supportedModes")
case _ =>
throw new AnalysisException(
s"$outputMode is not supported for memory sink. $supportedModes")
}
val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
val chkpointLoc = extraOptions.get("checkpointLocation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
}


test("registering as a table in Append output mode - supported") {
test("registering as a table in Append output mode") {
val input = MemoryStream[Int]
val query = input.toDF().writeStream
.format("memory")
Expand All @@ -160,7 +160,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
query.stop()
}

test("registering as a table in Complete output mode - supported") {
test("registering as a table in Complete output mode") {
val input = MemoryStream[Int]
val query = input.toDF()
.groupBy("value")
Expand All @@ -186,18 +186,27 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
query.stop()
}

test("registering as a table in Update output mode - not supported") {
test("registering as a table in Update output mode") {
val input = MemoryStream[Int]
val df = input.toDF()
.groupBy("value")
.count()
intercept[AnalysisException] {
df.writeStream
.format("memory")
.outputMode("update")
.queryName("memStream")
.start()
}
val query = input.toDF().writeStream
.format("memory")
.outputMode("update")
.queryName("memStream")
.start()
input.addData(1, 2, 3)
query.processAllAvailable()

checkDataset(
spark.table("memStream").as[Int],
1, 2, 3)

input.addData(4, 5, 6)
query.processAllAvailable()
checkDataset(
spark.table("memStream").as[Int],
1, 2, 3, 4, 5, 6)

query.stop()
}

test("MemoryPlan statistics") {
Expand Down