Skip to content

Commit c071878

Browse files
marmbrustdas
authored andcommitted
[SPARK-18124] Observed delay based Event Time Watermarks
This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify an _event time watermark_. An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data. This PR also has augmented `StreamExecution` to use this watermark for several purposes: - To know when a given time window aggregation is finalized and thus results can be emitted when using output modes that do not allow updates (e.g. `Append` mode). - To minimize the amount of state that we need to keep for on-going aggregations, by evicting state for groups that are no longer expected to change. Although, we do still maintain all state if the query requires (i.e. if the event time is not present in the `groupBy` or when running in `Complete` mode). An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive. ```scala df.withWatermark("eventTime", "5 minutes") .groupBy(window($"eventTime", "1 minute") as 'window) .count() .writeStream .format("console") .mode("append") // In append mode, we only output finalized aggregations. .start() ``` ### Calculating the watermark. The current event time is computed by looking at the `MAX(eventTime)` seen this epoch across all of the partitions in the query minus some user defined _delayThreshold_. An additional constraint is that the watermark must increase monotonically. Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least `delay` behind the actual event time. In some cases we may still process records that arrive more than delay late. This mechanism was chosen for the initial implementation over processing time for two reasons: - it is robust to downtime that could affect processing delay - it does not require syncing of time or timezones between the producer and the processing engine. ### Other notable implementation details - A new trigger metric `eventTimeWatermark` outputs the current value of the watermark. - We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`. This allows downstream operations to know which column holds the event time. Operations like `window` propagate this metadata. - `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of how this information is propagated. - Currently, we don't filter out late records, but instead rely on the state store to avoid emitting records that are both added and filtered in the same epoch. ### Remaining in this PR - [ ] The test for recovery is currently failing as we don't record the watermark used in the offset log. We will need to do so to ensure determinism, but this is deferred until apache#15626 is merged. ### Other follow-ups There are some natural additional features that we should consider for future work: - Ability to write records that arrive too late to some external store in case any out-of-band remediation is required. - `Update` mode so you can get partial results before a group is evicted. - Other mechanisms for calculating the watermark. In particular a watermark based on quantiles would be more robust to outliers. Author: Michael Armbrust <michael@databricks.com> Closes apache#15702 from marmbrus/watermarks.
1 parent bd85603 commit c071878

File tree

22 files changed

+597
-111
lines changed

22 files changed

+597
-111
lines changed

common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ public static long parseSecondNano(String secondNano) throws IllegalArgumentExce
252252
public final int months;
253253
public final long microseconds;
254254

255+
public final long milliseconds() {
256+
return this.microseconds / MICROS_PER_MILLI;
257+
}
258+
255259
public CalendarInterval(int months, long microseconds) {
256260
this.months = months;
257261
this.microseconds = microseconds;

sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ class AnalysisException protected[sql] (
3131
val message: String,
3232
val line: Option[Int] = None,
3333
val startPosition: Option[Int] = None,
34-
val plan: Option[LogicalPlan] = None,
34+
// Some plans fail to serialize due to bugs in scala collections.
35+
@transient val plan: Option[LogicalPlan] = None,
3536
val cause: Option[Throwable] = None)
3637
extends Exception(message, cause.orNull) with Serializable {
3738

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2272,7 +2272,13 @@ object TimeWindowing extends Rule[LogicalPlan] {
22722272
windowExpressions.head.timeColumn.resolved &&
22732273
windowExpressions.head.checkInputDataTypes().isSuccess) {
22742274
val window = windowExpressions.head
2275-
val windowAttr = AttributeReference("window", window.dataType)()
2275+
2276+
val metadata = window.timeColumn match {
2277+
case a: Attribute => a.metadata
2278+
case _ => Metadata.empty
2279+
}
2280+
val windowAttr =
2281+
AttributeReference("window", window.dataType, metadata = metadata)()
22762282

22772283
val maxNumOverlapping = math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
22782284
val windows = Seq.tabulate(maxNumOverlapping + 1) { i =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,16 @@ trait CheckAnalysis extends PredicateHelper {
148148
}
149149

150150
operator match {
151+
case etw: EventTimeWatermark =>
152+
etw.eventTime.dataType match {
153+
case s: StructType
154+
if s.find(_.name == "end").map(_.dataType) == Some(TimestampType) =>
155+
case _: TimestampType =>
156+
case _ =>
157+
failAnalysis(
158+
s"Event time must be defined on a window or a timestamp, but " +
159+
s"${etw.eventTime.name} is of type ${etw.eventTime.dataType.simpleString}")
160+
}
151161
case f: Filter if f.condition.dataType != BooleanType =>
152162
failAnalysis(
153163
s"filter expression '${f.condition.sql}' " +

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
21+
import org.apache.spark.sql.catalyst.expressions.Attribute
2122
import org.apache.spark.sql.catalyst.plans._
2223
import org.apache.spark.sql.catalyst.plans.logical._
2324
import org.apache.spark.sql.streaming.OutputMode
@@ -55,9 +56,20 @@ object UnsupportedOperationChecker {
5556
// Disallow some output mode
5657
outputMode match {
5758
case InternalOutputModes.Append if aggregates.nonEmpty =>
58-
throwError(
59-
s"$outputMode output mode not supported when there are streaming aggregations on " +
60-
s"streaming DataFrames/DataSets")(plan)
59+
val aggregate = aggregates.head
60+
61+
// Find any attributes that are associated with an eventTime watermark.
62+
val watermarkAttributes = aggregate.groupingExpressions.collect {
63+
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
64+
}
65+
66+
// We can append rows to the sink once the group is under the watermark. Without this
67+
// watermark a group is never "finished" so we would never output anything.
68+
if (watermarkAttributes.isEmpty) {
69+
throwError(
70+
s"$outputMode output mode not supported when there are streaming aggregations on " +
71+
s"streaming DataFrames/DataSets")(plan)
72+
}
6173

6274
case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
6375
throwError(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, Codege
2525
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
2626
import org.apache.spark.sql.catalyst.trees.TreeNode
2727
import org.apache.spark.sql.catalyst.util.quoteIdentifier
28-
import org.apache.spark.sql.types.{DataType, StructType}
28+
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
2929

3030
/**
3131
* Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully
@@ -98,6 +98,7 @@ case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Un
9898
override def withNullability(newNullability: Boolean): UnresolvedAttribute = this
9999
override def withQualifier(newQualifier: Option[String]): UnresolvedAttribute = this
100100
override def withName(newName: String): UnresolvedAttribute = UnresolvedAttribute.quoted(newName)
101+
override def withMetadata(newMetadata: Metadata): Attribute = this
101102

102103
override def toString: String = s"'$name"
103104

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.{Objects, UUID}
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2424
import org.apache.spark.sql.catalyst.expressions.codegen._
25+
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
2526
import org.apache.spark.sql.catalyst.util.quoteIdentifier
2627
import org.apache.spark.sql.types._
2728

@@ -104,6 +105,7 @@ abstract class Attribute extends LeafExpression with NamedExpression with NullIn
104105
def withNullability(newNullability: Boolean): Attribute
105106
def withQualifier(newQualifier: Option[String]): Attribute
106107
def withName(newName: String): Attribute
108+
def withMetadata(newMetadata: Metadata): Attribute
107109

108110
override def toAttribute: Attribute = this
109111
def newInstance(): Attribute
@@ -292,11 +294,22 @@ case class AttributeReference(
292294
}
293295
}
294296

297+
override def withMetadata(newMetadata: Metadata): Attribute = {
298+
AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated)
299+
}
300+
295301
override protected final def otherCopyArgs: Seq[AnyRef] = {
296302
exprId :: qualifier :: isGenerated :: Nil
297303
}
298304

299-
override def toString: String = s"$name#${exprId.id}$typeSuffix"
305+
/** Used to signal the column used to calculate an eventTime watermark (e.g. a#1-T{delayMs}) */
306+
private def delaySuffix = if (metadata.contains(EventTimeWatermark.delayKey)) {
307+
s"-T${metadata.getLong(EventTimeWatermark.delayKey)}ms"
308+
} else {
309+
""
310+
}
311+
312+
override def toString: String = s"$name#${exprId.id}$typeSuffix$delaySuffix"
300313

301314
// Since the expression id is not in the first constructor it is missing from the default
302315
// tree string.
@@ -332,6 +345,8 @@ case class PrettyAttribute(
332345
override def withQualifier(newQualifier: Option[String]): Attribute =
333346
throw new UnsupportedOperationException
334347
override def withName(newName: String): Attribute = throw new UnsupportedOperationException
348+
override def withMetadata(newMetadata: Metadata): Attribute =
349+
throw new UnsupportedOperationException
335350
override def qualifier: Option[String] = throw new UnsupportedOperationException
336351
override def exprId: ExprId = throw new UnsupportedOperationException
337352
override def nullable: Boolean = true
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical
19+
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
21+
import org.apache.spark.sql.types.MetadataBuilder
22+
import org.apache.spark.unsafe.types.CalendarInterval
23+
24+
object EventTimeWatermark {
25+
/** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */
26+
val delayKey = "spark.watermarkDelayMs"
27+
}
28+
29+
/**
30+
* Used to mark a user specified column as holding the event time for a row.
31+
*/
32+
case class EventTimeWatermark(
33+
eventTime: Attribute,
34+
delay: CalendarInterval,
35+
child: LogicalPlan) extends LogicalPlan {
36+
37+
// Update the metadata on the eventTime column to include the desired delay.
38+
override val output: Seq[Attribute] = child.output.map { a =>
39+
if (a semanticEquals eventTime) {
40+
val updatedMetadata = new MetadataBuilder()
41+
.withMetadata(a.metadata)
42+
.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
43+
.build()
44+
a.withMetadata(updatedMetadata)
45+
} else {
46+
a
47+
}
48+
}
49+
50+
override val children: Seq[LogicalPlan] = child :: Nil
51+
}

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import org.apache.spark.sql.execution.python.EvaluatePython
5050
import org.apache.spark.sql.streaming.DataStreamWriter
5151
import org.apache.spark.sql.types._
5252
import org.apache.spark.storage.StorageLevel
53+
import org.apache.spark.unsafe.types.CalendarInterval
5354
import org.apache.spark.util.Utils
5455

5556
private[sql] object Dataset {
@@ -476,7 +477,7 @@ class Dataset[T] private[sql](
476477
* `collect()`, will throw an [[AnalysisException]] when there is a streaming
477478
* source present.
478479
*
479-
* @group basic
480+
* @group streaming
480481
* @since 2.0.0
481482
*/
482483
@Experimental
@@ -496,8 +497,6 @@ class Dataset[T] private[sql](
496497
/**
497498
* Returns a checkpointed version of this Dataset.
498499
*
499-
* @param eager When true, materializes the underlying checkpointed RDD eagerly.
500-
*
501500
* @group basic
502501
* @since 2.1.0
503502
*/
@@ -535,6 +534,41 @@ class Dataset[T] private[sql](
535534
)(sparkSession)).as[T]
536535
}
537536

537+
/**
538+
* :: Experimental ::
539+
* Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time
540+
* before which we assume no more late data is going to arrive.
541+
*
542+
* Spark will use this watermark for several purposes:
543+
* - To know when a given time window aggregation can be finalized and thus can be emitted when
544+
* using output modes that do not allow updates.
545+
* - To minimize the amount of state that we need to keep for on-going aggregations.
546+
*
547+
* The current watermark is computed by looking at the `MAX(eventTime)` seen across
548+
* all of the partitions in the query minus a user specified `delayThreshold`. Due to the cost
549+
* of coordinating this value across partitions, the actual watermark used is only guaranteed
550+
* to be at least `delayThreshold` behind the actual event time. In some cases we may still
551+
* process records that arrive more than `delayThreshold` late.
552+
*
553+
* @param eventTime the name of the column that contains the event time of the row.
554+
* @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
555+
* record that has been processed in the form of an interval
556+
* (e.g. "1 minute" or "5 hours").
557+
*
558+
* @group streaming
559+
* @since 2.1.0
560+
*/
561+
@Experimental
562+
@InterfaceStability.Evolving
563+
// We only accept an existing column name, not a derived column here as a watermark that is
564+
// defined on a derived column cannot referenced elsewhere in the plan.
565+
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
566+
val parsedDelay =
567+
Option(CalendarInterval.fromString("interval " + delayThreshold))
568+
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
569+
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
570+
}
571+
538572
/**
539573
* Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated,
540574
* and all cells will be aligned right. For example:

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,23 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.rdd.RDD
21-
import org.apache.spark.sql.{execution, SaveMode, Strategy}
21+
import org.apache.spark.sql.{SaveMode, Strategy}
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.encoders.RowEncoder
2424
import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.planning._
2626
import org.apache.spark.sql.catalyst.plans._
27-
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
27+
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, EventTimeWatermark, LogicalPlan}
2828
import org.apache.spark.sql.catalyst.plans.physical._
29+
import org.apache.spark.sql.execution
2930
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
3031
import org.apache.spark.sql.execution.command._
3132
import org.apache.spark.sql.execution.datasources._
3233
import org.apache.spark.sql.execution.exchange.ShuffleExchange
3334
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
34-
import org.apache.spark.sql.execution.streaming.{MemoryPlan, StreamingExecutionRelation, StreamingRelation, StreamingRelationExec}
35+
import org.apache.spark.sql.execution.streaming._
36+
import org.apache.spark.sql.internal.SQLConf
37+
import org.apache.spark.sql.streaming.StreamingQuery
3538

3639
/**
3740
* Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
@@ -224,6 +227,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
224227
*/
225228
object StatefulAggregationStrategy extends Strategy {
226229
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
230+
case EventTimeWatermark(columnName, delay, child) =>
231+
EventTimeWatermarkExec(columnName, delay, planLater(child)) :: Nil
232+
227233
case PhysicalAggregation(
228234
namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) =>
229235

0 commit comments

Comments
 (0)