Skip to content

Commit f77b12b

Browse files
committed
add tests
1 parent 468f134 commit f77b12b

File tree

1 file changed

+26
-2
lines changed

1 file changed

+26
-2
lines changed

sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,39 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase {
7070
test("multiple partitions with coalesce - multiple transformations") {
7171
val input = ContinuousMemoryStream[Int]
7272

73+
// We use a barrier to make sure predicates both before and after coalesce work
7374
val df = input.toDF()
74-
.coalesce(1)
7575
.select('value as 'copy, 'value)
76+
.where('copy =!= 1)
77+
.planWithBarrier
78+
.coalesce(1)
7679
.where('copy =!= 2)
7780
.agg(max('value))
7881

7982
testStream(df, OutputMode.Complete)(
8083
AddData(input, 0, 1, 2),
81-
CheckAnswer(1),
84+
CheckAnswer(0),
85+
StopStream,
86+
AddData(input, 3, 4, 5),
87+
StartStream(),
88+
CheckAnswer(5),
89+
AddData(input, -1, -2, -3),
90+
CheckAnswer(5))
91+
}
92+
93+
test("multiple partitions with multiple coalesce") {
94+
val input = ContinuousMemoryStream[Int]
95+
96+
val df = input.toDF()
97+
.coalesce(1)
98+
.planWithBarrier
99+
.coalesce(1)
100+
.select('value as 'copy, 'value)
101+
.agg(max('value))
102+
103+
testStream(df, OutputMode.Complete)(
104+
AddData(input, 0, 1, 2),
105+
CheckAnswer(2),
82106
StopStream,
83107
AddData(input, 3, 4, 5),
84108
StartStream(),

0 commit comments

Comments
 (0)