Skip to content

Commit 30ca36c

Browse files
committed
address comments
1 parent 953b77d commit 30ca36c

File tree

2 files changed

+14
-0
lines changed

2 files changed

+14
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) {
3030
import DataSourceV2Implicits._
3131

3232
override def apply(plan: LogicalPlan): Unit = {
33+
plan.foreach {
34+
case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
35+
throw new AnalysisException(
36+
s"Table ${r.table.name()} does not support either micro-batch or continuous scan.")
37+
case _ =>
38+
}
39+
3340
val streamingSources = plan.collect {
3441
case r: StreamingRelationV2 => r.table
3542
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ class V2StreamingScanSupportCheckSuite extends SparkFunSuite with SharedSparkSes
5858
V2StreamingScanSupportCheck(Union(plan3, plan4))
5959
}
6060

61+
test("table without scan capability") {
62+
val e = intercept[AnalysisException] {
63+
V2StreamingScanSupportCheck(createStreamingRelation(CapabilityTable(), None))
64+
}
65+
assert(e.message.contains("does not support either micro-batch or continuous scan"))
66+
}
67+
6168
test("mix micro-batch only and continuous only") {
6269
val plan1 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ), None)
6370
val plan2 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None)

0 commit comments

Comments
 (0)