Skip to content

Commit 404f7e2

Browse files
committed
[SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3
This is a backport of apache#20598. ## What changes were proposed in this pull request? Solved two bugs to enable stream-stream self joins. ### Incorrect analysis due to missing MultiInstanceRelation trait Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary for the catalyst analyzer to convert the self-join logical plan DAG into a tree (by creating new instances of the leaf relations). This was causing the error `Failure when resolving conflicting references in Join:` (see JIRA for details). ### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution When splicing the source's batch plan into the streaming plan (by replacing the StreamingExecutionPlan), we were rewriting the attribute reference in the streaming plan with the new attribute references from the batch plan. This was incorrectly handling the scenario when multiple StreamingExecutionRelation point to the same source, and therefore eventually point to the same batch plan returned by the source. Here is an example query, and its corresponding plan transformations. ``` val df = input.toDF val join = df.select('value % 5 as "key", 'value).join( df.select('value % 5 as "key", 'value), "key") ``` Streaming logical plan before splicing the batch plan ``` Project [key#6, value#1, value#12] +- Join Inner, (key#6 = key#9) :- Project [(value#1 % 5) AS key#6, value#1] : +- StreamingExecutionRelation Memory[#1], value#1 +- Project [(value#12 % 5) AS key#9, value#12] +- StreamingExecutionRelation Memory[#1], value#12 // two different leaves pointing to same source ``` Batch logical plan after splicing the batch plan and before rewriting ``` Project [key#6, value#1, value#12] +- Join Inner, (key#6 = key#9) :- Project [(value#1 % 5) AS key#6, value#1] : +- LocalRelation [value#66] // replaces StreamingExecutionRelation Memory[#1], value#1 +- Project [(value#12 % 5) AS key#9, value#12] +- LocalRelation [value#66] // replaces StreamingExecutionRelation Memory[#1], value#12 ``` Batch logical plan after rewriting the attributes. Specifically, for spliced, the new output attributes (value#66) replace the earlier output attributes (value#12, and value#1, one for each StreamingExecutionRelation). ``` Project [key#6, value#66, value#66] // both value#1 and value#12 replaces by value#66 +- Join Inner, (key#6 = key#9) :- Project [(value#66 % 5) AS key#6, value#66] : +- LocalRelation [value#66] +- Project [(value#66 % 5) AS key#9, value#66] +- LocalRelation [value#66] ``` This causes the optimizer to eliminate value#66 from one side of the join. ``` Project [key#6, value#66, value#66] +- Join Inner, (key#6 = key#9) :- Project [(value#66 % 5) AS key#6, value#66] : +- LocalRelation [value#66] +- Project [(value#66 % 5) AS key#9] // this does not generate value, incorrect join results +- LocalRelation [value#66] ``` **Solution**: Instead of rewriting attributes, use a Project to introduce aliases between the output attribute references and the new reference generated by the spliced plans. The analyzer and optimizer will take care of the rest. ``` Project [key#6, value#1, value#12] +- Join Inner, (key#6 = key#9) :- Project [(value#1 % 5) AS key#6, value#1] : +- Project [value#66 AS value#1] // solution: project with aliases : +- LocalRelation [value#66] +- Project [(value#12 % 5) AS key#9, value#12] +- Project [value#66 AS value#12] // solution: project with aliases +- LocalRelation [value#66] ``` ## How was this patch tested? New unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#20765 from tdas/SPARK-23406-2.3.
1 parent 1dd37ff commit 404f7e2

File tree

3 files changed

+45
-16
lines changed

3 files changed

+45
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

+7-9
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
2424

2525
import org.apache.spark.sql.{Dataset, SparkSession}
2626
import org.apache.spark.sql.catalyst.encoders.RowEncoder
27-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
28-
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
27+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
28+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
2929
import org.apache.spark.sql.execution.SQLExecution
3030
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
3131
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
@@ -410,27 +410,25 @@ class MicroBatchExecution(
410410
}
411411
}
412412

413-
// A list of attributes that will need to be updated.
414-
val replacements = new ArrayBuffer[(Attribute, Attribute)]
415413
// Replace sources in the logical plan with data that has arrived since the last batch.
416414
val newBatchesPlan = logicalPlan transform {
417415
case StreamingExecutionRelation(source, output) =>
418416
newData.get(source).map { dataPlan =>
419417
assert(output.size == dataPlan.output.size,
420418
s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
421419
s"${Utils.truncatedString(dataPlan.output, ",")}")
422-
replacements ++= output.zip(dataPlan.output)
423-
dataPlan
420+
421+
val aliases = output.zip(dataPlan.output).map { case (to, from) =>
422+
Alias(from, to.name)(exprId = to.exprId, explicitMetadata = Some(from.metadata))
423+
}
424+
Project(aliases, dataPlan)
424425
}.getOrElse {
425426
LocalRelation(output, isStreaming = true)
426427
}
427428
}
428429

429430
// Rewire the plan to use the new attributes that were returned by the source.
430-
val replacementMap = AttributeMap(replacements)
431431
val newAttributePlan = newBatchesPlan transformAllExpressions {
432-
case a: Attribute if replacementMap.contains(a) =>
433-
replacementMap(a).withMetadata(a.metadata)
434432
case ct: CurrentTimestamp =>
435433
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
436434
ct.dataType)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala

+14-6
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.spark.sql.execution.streaming
2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.SparkSession
2222
import org.apache.spark.sql.catalyst.InternalRow
23+
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2324
import org.apache.spark.sql.catalyst.expressions.Attribute
24-
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
25-
import org.apache.spark.sql.catalyst.plans.logical.Statistics
25+
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
2626
import org.apache.spark.sql.execution.LeafExecNode
2727
import org.apache.spark.sql.execution.datasources.DataSource
2828
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2}
@@ -42,7 +42,7 @@ object StreamingRelation {
4242
* passing to [[StreamExecution]] to run a query.
4343
*/
4444
case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
45-
extends LeafNode {
45+
extends LeafNode with MultiInstanceRelation {
4646
override def isStreaming: Boolean = true
4747
override def toString: String = sourceName
4848

@@ -53,6 +53,8 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
5353
override def computeStats(): Statistics = Statistics(
5454
sizeInBytes = BigInt(dataSource.sparkSession.sessionState.conf.defaultSizeInBytes)
5555
)
56+
57+
override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))
5658
}
5759

5860
/**
@@ -62,7 +64,7 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
6264
case class StreamingExecutionRelation(
6365
source: BaseStreamingSource,
6466
output: Seq[Attribute])(session: SparkSession)
65-
extends LeafNode {
67+
extends LeafNode with MultiInstanceRelation {
6668

6769
override def isStreaming: Boolean = true
6870
override def toString: String = source.toString
@@ -74,6 +76,8 @@ case class StreamingExecutionRelation(
7476
override def computeStats(): Statistics = Statistics(
7577
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
7678
)
79+
80+
override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
7781
}
7882

7983
// We have to pack in the V1 data source as a shim, for the case when a source implements
@@ -92,13 +96,15 @@ case class StreamingRelationV2(
9296
extraOptions: Map[String, String],
9397
output: Seq[Attribute],
9498
v1Relation: Option[StreamingRelation])(session: SparkSession)
95-
extends LeafNode {
99+
extends LeafNode with MultiInstanceRelation {
96100
override def isStreaming: Boolean = true
97101
override def toString: String = sourceName
98102

99103
override def computeStats(): Statistics = Statistics(
100104
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
101105
)
106+
107+
override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
102108
}
103109

104110
/**
@@ -108,7 +114,7 @@ case class ContinuousExecutionRelation(
108114
source: ContinuousReadSupport,
109115
extraOptions: Map[String, String],
110116
output: Seq[Attribute])(session: SparkSession)
111-
extends LeafNode {
117+
extends LeafNode with MultiInstanceRelation {
112118

113119
override def isStreaming: Boolean = true
114120
override def toString: String = source.toString
@@ -120,6 +126,8 @@ case class ContinuousExecutionRelation(
120126
override def computeStats(): Statistics = Statistics(
121127
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
122128
)
129+
130+
override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
123131
}
124132

125133
/**

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala

+24-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
2828
import org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper
2929
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Literal}
3030
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter}
31-
import org.apache.spark.sql.execution.LogicalRDD
31+
import org.apache.spark.sql.catalyst.trees.TreeNode
32+
import org.apache.spark.sql.execution.{FileSourceScanExec, LogicalRDD}
33+
import org.apache.spark.sql.execution.datasources.LogicalRelation
3234
import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinHelper}
3335
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreProviderId}
3436
import org.apache.spark.sql.functions._
@@ -323,6 +325,27 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
323325
assert(e.toString.contains("Stream stream joins without equality predicate is not supported"))
324326
}
325327

328+
test("stream stream self join") {
329+
val input = MemoryStream[Int]
330+
val df = input.toDF
331+
val join =
332+
df.select('value % 5 as "key", 'value).join(
333+
df.select('value % 5 as "key", 'value), "key")
334+
335+
testStream(join)(
336+
AddData(input, 1, 2),
337+
CheckAnswer((1, 1, 1), (2, 2, 2)),
338+
StopStream,
339+
StartStream(),
340+
AddData(input, 3, 6),
341+
/*
342+
(1, 1) (1, 1)
343+
(2, 2) x (2, 2) = (1, 1, 1), (1, 1, 6), (2, 2, 2), (1, 6, 1), (1, 6, 6)
344+
(1, 6) (1, 6)
345+
*/
346+
CheckAnswer((3, 3, 3), (1, 1, 1), (1, 1, 6), (2, 2, 2), (1, 6, 1), (1, 6, 6)))
347+
}
348+
326349
test("locality preferences of StateStoreAwareZippedRDD") {
327350
import StreamingSymmetricHashJoinHelper._
328351

0 commit comments

Comments
 (0)