Skip to content

Commit cffb2fc

Browse files
committed
[FLINK-20487][table-planner-blink] Remove restriction on StreamPhysicalGroupWindowAggregate which only supports insert-only input node.
1 parent 2ef03c4 commit cffb2fc

File tree

8 files changed

+200
-55
lines changed

8 files changed

+200
-55
lines changed

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
200200
createNewNode(agg, children, providedTrait, requiredTrait, requester)
201201

202202
case window: StreamPhysicalGroupWindowAggregateBase =>
203-
// WindowAggregate and WindowTableAggregate support insert-only in input
204-
val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)
203+
// WindowAggregate and WindowTableAggregate support all changes in input
204+
val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
205205
val builder = ModifyKindSet.newBuilder()
206206
.addContainedKind(ModifyKind.INSERT)
207207
if (window.emitStrategy.produceUpdates) {
@@ -470,7 +470,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
470470

471471
case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
472472
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
473-
_: StreamPhysicalPythonGroupTableAggregate =>
473+
_: StreamPhysicalPythonGroupTableAggregate |
474+
_: StreamPhysicalGroupWindowAggregateBase =>
474475
// Aggregate, TableAggregate and Limit requires update_before if there are updates
475476
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
476477
val children = visitChildren(rel, requiredChildTrait)

flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,39 @@ Calc(select=[a, 3:BIGINT AS $1])
6969
+- Exchange(distribution=[hash[b]])
7070
+- Calc(select=[a, b, rowtime])
7171
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
72+
]]>
73+
</Resource>
74+
</TestCase>
75+
<TestCase name="testLastRowWithWindowOnRowtime">
76+
<Resource name="explain">
77+
<![CDATA[== Abstract Syntax Tree ==
78+
LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)])
79+
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[SUM($2)])
80+
+- LogicalProject(b=[$1], $f1=[$TUMBLE($2, 4:INTERVAL SECOND)], a=[$0])
81+
+- LogicalFilter(condition=[=($3, 1)])
82+
+- LogicalProject(a=[$0], b=[$1], ts=[$2], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
83+
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$2])
84+
+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]])
85+
86+
== Optimized Physical Plan ==
87+
Calc(select=[b, EXPR$1, w$start AS EXPR$2])
88+
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
89+
+- Exchange(distribution=[hash[b]])
90+
+- Calc(select=[b, ts, a])
91+
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME])
92+
+- Exchange(distribution=[hash[a]])
93+
+- WatermarkAssigner(rowtime=[ts], watermark=[ts])
94+
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, ts])
95+
96+
== Optimized Execution Plan ==
97+
Calc(select=[b, EXPR$1, w$start AS EXPR$2])
98+
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
99+
+- Exchange(distribution=[hash[b]])
100+
+- Calc(select=[b, ts, a])
101+
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME])
102+
+- Exchange(distribution=[hash[a]])
103+
+- WatermarkAssigner(rowtime=[ts], watermark=[ts])
104+
+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, ts])
72105
]]>
73106
</Resource>
74107
</TestCase>

flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,30 @@ Union(all=[true], union=[b, ts, a], changelogMode=[I,UA,D])
594594
+- GroupAggregate(groupBy=[a], select=[a, MAX(ts) AS t, MAX(b) AS b], changelogMode=[I,UA])
595595
+- Exchange(distribution=[hash[a]], changelogMode=[I])
596596
+- TableSourceScan(table=[[default_catalog, default_database, append_src]], fields=[ts, a, b], changelogMode=[I])
597+
]]>
598+
</Resource>
599+
</TestCase>
600+
<TestCase name="testUpsertSourceWithComputedColumnAndWatermark">
601+
<Resource name="sql">
602+
<![CDATA[SELECT a, b, c FROM src WHERE a > 1]]>
603+
</Resource>
604+
<Resource name="ast">
605+
<![CDATA[
606+
LogicalProject(a=[$1], b=[$2], c=[$3])
607+
+- LogicalFilter(condition=[>($1, 1)])
608+
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
609+
+- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
610+
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
611+
]]>
612+
</Resource>
613+
<Resource name="optimized rel plan">
614+
<![CDATA[
615+
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
616+
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
617+
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
618+
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
619+
+- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
620+
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
597621
]]>
598622
</Resource>
599623
</TestCase>
@@ -618,27 +642,28 @@ Calc(select=[id, Reinterpret(TO_TIMESTAMP(c)) AS ts], changelogMode=[I,UA,D])
618642
]]>
619643
</Resource>
620644
</TestCase>
621-
<TestCase name="testUpsertSourceWithComputedColumnAndWatermark">
645+
<TestCase name="testWindowAggregateOnChangelogSource">
622646
<Resource name="sql">
623-
<![CDATA[SELECT a, b, c FROM src WHERE a > 1]]>
647+
<![CDATA[
648+
SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*)
649+
FROM src
650+
GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
651+
]]>
624652
</Resource>
625653
<Resource name="ast">
626654
<![CDATA[
627-
LogicalProject(a=[$1], b=[$2], c=[$3])
628-
+- LogicalFilter(condition=[>($1, 1)])
629-
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 1000:INTERVAL SECOND)])
630-
+- LogicalProject(id=[$0], a=[$1], b=[+($1, 1)], c=[$2], ts=[TO_TIMESTAMP($2)])
631-
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
655+
LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[$1])
656+
+- LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])
657+
+- LogicalProject($f0=[$TUMBLE(PROCTIME(), 10000:INTERVAL SECOND)])
658+
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
632659
]]>
633660
</Resource>
634661
<Resource name="optimized rel plan">
635662
<![CDATA[
636-
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
637-
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
638-
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
639-
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
640-
+- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
641-
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
663+
Calc(select=[CAST(w$start) AS EXPR$0, EXPR$1], changelogMode=[I])
664+
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 10000)], properties=[w$start, w$end, w$proctime], select=[COUNT(*) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime], changelogMode=[I])
665+
+- Exchange(distribution=[single], changelogMode=[I,UB,UA])
666+
+- TableSourceScan(table=[[default_catalog, default_database, src, project=[]]], fields=[], changelogMode=[I,UB,UA])
642667
]]>
643668
</Resource>
644669
</TestCase>

flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml

Lines changed: 68 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -642,35 +642,6 @@ Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I])
642642
+- Exchange(distribution=[single], changelogMode=[I])
643643
+- Calc(select=[rowtime], changelogMode=[I])
644644
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
645-
]]>
646-
</Resource>
647-
</TestCase>
648-
<TestCase name="testWindowGroupByOnConstant">
649-
<Resource name="sql">
650-
<![CDATA[
651-
SELECT COUNT(*),
652-
weightedAvg(c, a) AS wAvg,
653-
TUMBLE_START(rowtime, INTERVAL '15' MINUTE),
654-
TUMBLE_END(rowtime, INTERVAL '15' MINUTE)
655-
FROM MyTable
656-
GROUP BY 'a', TUMBLE(rowtime, INTERVAL '15' MINUTE)
657-
]]>
658-
</Resource>
659-
<Resource name="ast">
660-
<![CDATA[
661-
LogicalProject(EXPR$0=[$2], wAvg=[$3], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_END($1)])
662-
+- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT()], wAvg=[weightedAvg($2, $3)])
663-
+- LogicalProject($f0=[_UTF-16LE'a'], $f1=[$TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2], $f3=[CAST($0):BIGINT])
664-
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
665-
]]>
666-
</Resource>
667-
<Resource name="optimized exec plan">
668-
<![CDATA[
669-
Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
670-
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
671-
+- Exchange(distribution=[single])
672-
+- Calc(select=[rowtime, c, CAST(a) AS a])
673-
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
674645
]]>
675646
</Resource>
676647
</TestCase>
@@ -718,6 +689,74 @@ Union(all=[true], union=[EXPR$0])
718689
+- Calc(select=[1 AS EXPR$0])
719690
+- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 3600000)], select=[])
720691
+- Reused(reference_id=[1])
692+
]]>
693+
</Resource>
694+
</TestCase>
695+
<TestCase name="testWindowGroupByOnConstant">
696+
<Resource name="sql">
697+
<![CDATA[
698+
SELECT COUNT(*),
699+
weightedAvg(c, a) AS wAvg,
700+
TUMBLE_START(rowtime, INTERVAL '15' MINUTE),
701+
TUMBLE_END(rowtime, INTERVAL '15' MINUTE)
702+
FROM MyTable
703+
GROUP BY 'a', TUMBLE(rowtime, INTERVAL '15' MINUTE)
704+
]]>
705+
</Resource>
706+
<Resource name="ast">
707+
<![CDATA[
708+
LogicalProject(EXPR$0=[$2], wAvg=[$3], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_END($1)])
709+
+- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT()], wAvg=[weightedAvg($2, $3)])
710+
+- LogicalProject($f0=[_UTF-16LE'a'], $f1=[$TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2], $f3=[CAST($0):BIGINT])
711+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
712+
]]>
713+
</Resource>
714+
<Resource name="optimized exec plan">
715+
<![CDATA[
716+
Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
717+
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
718+
+- Exchange(distribution=[single])
719+
+- Calc(select=[rowtime, c, CAST(a) AS a])
720+
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
721+
]]>
722+
</Resource>
723+
</TestCase>
724+
<TestCase name="testWindowAggregateOnRetractStream">
725+
<Resource name="sql">
726+
<![CDATA[
727+
SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
728+
FROM (
729+
SELECT a, b, c, rowtime
730+
FROM (
731+
SELECT *,
732+
ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rowNum
733+
FROM MyTable
734+
)
735+
WHERE rowNum = 1
736+
)
737+
GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
738+
]]>
739+
</Resource>
740+
<Resource name="ast">
741+
<![CDATA[
742+
LogicalProject(EXPR$0=[TUMBLE_START($0)], cnt=[$1])
743+
+- LogicalAggregate(group=[{0}], cnt=[COUNT()])
744+
+- LogicalProject($f0=[$TUMBLE($4, 1000:INTERVAL SECOND)])
745+
+- LogicalFilter(condition=[=($5, 1)])
746+
+- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 DESC NULLS LAST)])
747+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
748+
]]>
749+
</Resource>
750+
<Resource name="optimized rel plan">
751+
<![CDATA[
752+
Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I])
753+
+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 1000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I])
754+
+- Exchange(distribution=[single], changelogMode=[I,UB,UA,D])
755+
+- Calc(select=[rowtime], changelogMode=[I,UB,UA,D])
756+
+- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME], changelogMode=[I,UB,UA,D])
757+
+- Exchange(distribution=[hash[a]], changelogMode=[I])
758+
+- Calc(select=[a, rowtime], changelogMode=[I])
759+
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
721760
]]>
722761
</Resource>
723762
</TestCase>

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,6 @@ class DeduplicateTest extends TableTestBase {
105105
|GROUP BY b, TUMBLE(ts, INTERVAL '0.004' SECOND)
106106
""".stripMargin
107107

108-
thrown.expect(classOf[TableException])
109-
thrown.expectMessage("GroupWindowAggregate doesn't support consuming update " +
110-
"and delete changes which is produced by node Deduplicate(")
111108
util.verifyExplain(windowSql)
112109
}
113110

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ class TableScanTest extends TableTestBase {
592592
}
593593

594594
@Test
595-
def testUnsupportedWindowAggregateOnChangelogSource(): Unit = {
595+
def testWindowAggregateOnChangelogSource(): Unit = {
596596
util.addTable(
597597
"""
598598
|CREATE TABLE src (
@@ -610,10 +610,6 @@ class TableScanTest extends TableTestBase {
610610
|FROM src
611611
|GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
612612
|""".stripMargin
613-
thrown.expect(classOf[TableException])
614-
thrown.expectMessage(
615-
"GroupWindowAggregate doesn't support consuming update changes " +
616-
"which is produced by node TableSourceScan")
617613
util.verifyRelPlan(query, ExplainDetail.CHANGELOG_MODE)
618614
}
619615

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,4 +456,23 @@ class GroupWindowTest extends TableTestBase {
456456
|""".stripMargin
457457
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
458458
}
459+
460+
@Test
461+
def testWindowAggregateOnRetractStream(): Unit = {
462+
val sql =
463+
"""
464+
|SELECT TUMBLE_START(`rowtime`, INTERVAL '1' SECOND), COUNT(*) cnt
465+
|FROM (
466+
| SELECT a, b, c, rowtime
467+
| FROM (
468+
| SELECT *,
469+
| ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as rowNum
470+
| FROM MyTable
471+
| )
472+
| WHERE rowNum = 1
473+
|)
474+
|GROUP BY TUMBLE(`rowtime`, INTERVAL '1' SECOND)
475+
|""".stripMargin
476+
util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
477+
}
459478
}

flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.table.planner.runtime.stream.sql
2020

21+
import java.math.BigDecimal
22+
2123
import org.apache.flink.api.common.time.Time
2224
import org.apache.flink.api.common.typeinfo.TypeInformation
2325
import org.apache.flink.api.scala._
@@ -30,12 +32,10 @@ import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{
3032
import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
3133
import org.apache.flink.table.planner.runtime.utils._
3234
import org.apache.flink.types.Row
33-
3435
import org.junit.Assert.assertEquals
3536
import org.junit.Test
3637
import org.junit.runner.RunWith
3738
import org.junit.runners.Parameterized
38-
3939
import java.time.{Duration, ZoneId}
4040
import java.util
4141
import java.util.concurrent.TimeUnit
@@ -370,6 +370,41 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
370370
assertEquals(expected.sorted.mkString("\n"), sink.getUpsertResults.sorted.mkString("\n"))
371371
}
372372

373+
@Test
374+
def testWindowAggregateOnRetractStream(): Unit = {
375+
val sql =
376+
"""
377+
|SELECT
378+
|`string`,
379+
|TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,
380+
|TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) as w_end,
381+
|COUNT(1) AS cnt
382+
|FROM
383+
| (
384+
| SELECT `string`, rowtime
385+
| FROM (
386+
| SELECT *,
387+
| ROW_NUMBER() OVER (PARTITION BY `string` ORDER BY rowtime DESC) as rowNum
388+
| FROM testTable
389+
| )
390+
| WHERE rowNum = 1
391+
|)
392+
|GROUP BY `string`, TUMBLE(rowtime, INTERVAL '0.005' SECOND)
393+
|""".stripMargin
394+
val sink = new TestingAppendSink
395+
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
396+
env.execute()
397+
val expected = Seq(
398+
"Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1",
399+
"Hallo,1970-01-01T00:00,1970-01-01T00:00:00.005,1",
400+
"Hello,1970-01-01T00:00,1970-01-01T00:00:00.005,0",
401+
"Hello,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1",
402+
"Hello world,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,0",
403+
"Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1",
404+
"null,1970-01-01T00:00:00.030,1970-01-01T00:00:00.035,1")
405+
assertEquals(expected.sorted, sink.getAppendResults.sorted)
406+
}
407+
373408
@Test
374409
def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
375410
if (useTimestampLtz) {

0 commit comments

Comments
 (0)