Skip to content

Commit c0ec93f

Browse files
committed
test all combinations
1 parent d7b4571 commit c0ec93f

File tree

4 files changed

+161
-105
lines changed

4 files changed

+161
-105
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,14 @@ class MicroBatchExecution(
9191
nextSourceId += 1
9292
StreamingExecutionRelation(reader, output)(sparkSession)
9393
})
94-
case s @ StreamingRelationV2(_, _, _, output, v1Relation) =>
94+
case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
9595
v2ToExecutionRelationMap.getOrElseUpdate(s, {
9696
// Materialize source to avoid creating it in every batch
9797
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
98-
assert(v1Relation.isDefined, "v2 execution didn't match but v1 was unavailable")
98+
if (v1Relation.isEmpty) {
99+
throw new UnsupportedOperationException(
100+
s"Data source $sourceName does not support microbatch processing.")
101+
}
99102
val source = v1Relation.get.dataSource.createSource(metadataPath)
100103
nextSourceId += 1
101104
StreamingExecutionRelation(source, output)(sparkSession)

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
283283
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
284284
val sink = (ds.newInstance(), trigger) match {
285285
case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
286-
case (_, _: ContinuousTrigger) => throw new AnalysisException(
286+
case (_, _: ContinuousTrigger) => throw new UnsupportedOperationException(
287287
s"Data source $source does not support continuous writing")
288288
case (w: MicroBatchWriteSupport, _) => w
289289
case _ =>

sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ org.apache.spark.sql.sources.FakeSourceFour
55
org.apache.fakesource.FakeExternalSourceOne
66
org.apache.fakesource.FakeExternalSourceTwo
77
org.apache.fakesource.FakeExternalSourceThree
8-
org.apache.spark.sql.streaming.sources.FakeStreamingMicroBatchOnly
9-
org.apache.spark.sql.streaming.sources.FakeStreamingContinuousOnly
10-
org.apache.spark.sql.streaming.sources.FakeStreamingBothModes
11-
org.apache.spark.sql.streaming.sources.FakeStreamingNeitherMode
8+
org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly
9+
org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly
10+
org.apache.spark.sql.streaming.sources.FakeReadBothModes
11+
org.apache.spark.sql.streaming.sources.FakeReadNeitherMode
12+
org.apache.spark.sql.streaming.sources.FakeWriteMicroBatchOnly
13+
org.apache.spark.sql.streaming.sources.FakeWriteContinuousOnly
14+
org.apache.spark.sql.streaming.sources.FakeWriteBothModes
15+
org.apache.spark.sql.streaming.sources.FakeWriteNeitherMode

sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala

Lines changed: 147 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -20,38 +20,51 @@ package org.apache.spark.sql.streaming.sources
2020
import java.util.Optional
2121

2222
import org.apache.spark.sql.{AnalysisException, Row}
23+
import org.apache.spark.sql.execution.datasources.DataSource
2324
import org.apache.spark.sql.execution.streaming.{LongOffset, RateStreamOffset}
25+
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
2426
import org.apache.spark.sql.sources.DataSourceRegister
2527
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
2628
import org.apache.spark.sql.sources.v2.reader.ReadTask
2729
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport, MicroBatchReadSupport, MicroBatchWriteSupport}
2830
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
2931
import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
3032
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
31-
import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
33+
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest, Trigger}
3234
import org.apache.spark.sql.types.StructType
3335
import org.apache.spark.util.Utils
3436

35-
object FakeReader extends MicroBatchReader with ContinuousReader {
37+
case class FakeReader() extends MicroBatchReader with ContinuousReader {
3638
def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {}
3739
def getStartOffset: Offset = RateStreamOffset(Map())
3840
def getEndOffset: Offset = RateStreamOffset(Map())
3941
def deserializeOffset(json: String): Offset = RateStreamOffset(Map())
4042
def commit(end: Offset): Unit = {}
4143
def readSchema(): StructType = StructType(Seq())
42-
def createReadTasks(): java.util.ArrayList[ReadTask[Row]] = new java.util.ArrayList()
4344
def stop(): Unit = {}
4445
def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
4546
def setOffset(start: Optional[Offset]): Unit = {}
47+
48+
def createReadTasks(): java.util.ArrayList[ReadTask[Row]] = {
49+
throw new IllegalStateException("fake source - cannot actually read")
50+
}
4651
}
4752

48-
class FakeStreamingMicroBatchOnly extends DataSourceRegister
49-
with DataSourceV2 with MicroBatchReadSupport with MicroBatchWriteSupport {
53+
trait FakeMicroBatchReadSupport extends MicroBatchReadSupport {
5054
override def createMicroBatchReader(
5155
schema: Optional[StructType],
5256
checkpointLocation: String,
53-
options: DataSourceV2Options): MicroBatchReader = FakeReader
57+
options: DataSourceV2Options): MicroBatchReader = FakeReader()
58+
}
59+
60+
trait FakeContinuousReadSupport extends ContinuousReadSupport {
61+
override def createContinuousReader(
62+
schema: Optional[StructType],
63+
checkpointLocation: String,
64+
options: DataSourceV2Options): ContinuousReader = FakeReader()
65+
}
5466

67+
trait FakeMicroBatchWriteSupport extends MicroBatchWriteSupport {
5568
def createMicroBatchWriter(
5669
queryId: String,
5770
epochId: Long,
@@ -60,140 +73,176 @@ class FakeStreamingMicroBatchOnly extends DataSourceRegister
6073
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
6174
throw new IllegalStateException("fake sink - cannot actually write")
6275
}
63-
64-
override def shortName(): String = "fake-microbatch-only"
6576
}
6677

67-
class FakeStreamingContinuousOnly extends DataSourceRegister
68-
with DataSourceV2 with ContinuousReadSupport with ContinuousWriteSupport {
69-
override def createContinuousReader(
70-
schema: Optional[StructType],
71-
checkpointLocation: String,
72-
options: DataSourceV2Options): ContinuousReader = FakeReader
73-
78+
trait FakeContinuousWriteSupport extends ContinuousWriteSupport {
7479
def createContinuousWriter(
7580
queryId: String,
7681
schema: StructType,
7782
mode: OutputMode,
7883
options: DataSourceV2Options): Optional[ContinuousWriter] = {
7984
throw new IllegalStateException("fake sink - cannot actually write")
8085
}
86+
}
8187

82-
override def shortName(): String = "fake-continuous-only"
88+
class FakeReadMicroBatchOnly extends DataSourceRegister with FakeMicroBatchReadSupport {
89+
override def shortName(): String = "fake-read-microbatch-only"
8390
}
8491

85-
class FakeStreamingBothModes extends DataSourceRegister
86-
with DataSourceV2 with MicroBatchReadSupport with ContinuousReadSupport
87-
with MicroBatchWriteSupport with ContinuousWriteSupport {
88-
override def createMicroBatchReader(
89-
schema: Optional[StructType],
90-
checkpointLocation: String,
91-
options: DataSourceV2Options): MicroBatchReader = FakeReader
92+
class FakeReadContinuousOnly extends DataSourceRegister with FakeContinuousReadSupport {
93+
override def shortName(): String = "fake-read-continuous-only"
94+
}
9295

93-
def createMicroBatchWriter(
94-
queryId: String,
95-
epochId: Long,
96-
schema: StructType,
97-
mode: OutputMode,
98-
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
99-
throw new IllegalStateException("fake sink - cannot actually write")
100-
}
96+
class FakeReadBothModes extends DataSourceRegister
97+
with FakeMicroBatchReadSupport with FakeContinuousReadSupport {
98+
override def shortName(): String = "fake-read-microbatch-continuous"
99+
}
101100

102-
override def createContinuousReader(
103-
schema: Optional[StructType],
104-
checkpointLocation: String,
105-
options: DataSourceV2Options): ContinuousReader = FakeReader
101+
class FakeReadNeitherMode extends DataSourceRegister {
102+
override def shortName(): String = "fake-read-neither-mode"
103+
}
106104

107-
def createContinuousWriter(
108-
queryId: String,
109-
schema: StructType,
110-
mode: OutputMode,
111-
options: DataSourceV2Options): Optional[ContinuousWriter] = {
112-
throw new IllegalStateException("fake sink - cannot actually write")
113-
}
105+
class FakeWriteMicroBatchOnly extends DataSourceRegister with FakeMicroBatchWriteSupport {
106+
override def shortName(): String = "fake-write-microbatch-only"
107+
}
114108

115-
override def shortName(): String = "fake-both-modes"
109+
class FakeWriteContinuousOnly extends DataSourceRegister with FakeContinuousWriteSupport {
110+
override def shortName(): String = "fake-write-continuous-only"
116111
}
117112

118-
class FakeStreamingNeitherMode extends DataSourceRegister with DataSourceV2 {
119-
override def shortName(): String = "fake-neither-mode"
113+
class FakeWriteBothModes extends DataSourceRegister
114+
with FakeMicroBatchWriteSupport with FakeContinuousWriteSupport {
115+
override def shortName(): String = "fake-write-microbatch-continuous"
120116
}
121117

122-
class StreamingDataSourceV2Suite extends StreamTest {
118+
class FakeWriteNeitherMode extends DataSourceRegister {
119+
override def shortName(): String = "fake-write-neither-mode"
120+
}
123121

124-
private def df = spark.readStream.format("rate").load()
122+
class StreamingDataSourceV2Suite extends StreamTest {
125123

126124
override def beforeAll(): Unit = {
127125
super.beforeAll()
128126
val fakeCheckpoint = Utils.createTempDir()
129127
spark.conf.set("spark.sql.streaming.checkpointLocation", fakeCheckpoint.getCanonicalPath)
130128
}
131129

132-
testQuietly("create microbatch with only microbatch support") {
133-
val query = df.writeStream.format("fake-microbatch-only").start()
134-
query.stop()
135-
}
136-
137-
testQuietly("create microbatch with both support") {
138-
val query = df.writeStream.format("fake-both-modes").start()
139-
query.stop()
140-
}
141-
142-
testQuietly("create continuous with only continuous support") {
143-
val query = df.writeStream
144-
.format("fake-continuous-only")
145-
.trigger(Trigger.Continuous(100))
146-
.start()
147-
query.stop()
148-
}
149-
150-
testQuietly("create continuous with both support") {
151-
val query = df.writeStream
152-
.format("fake-both-modes")
153-
.trigger(Trigger.Continuous(100))
130+
val readFormats = Seq(
131+
"fake-read-microbatch-only",
132+
"fake-read-continuous-only",
133+
"fake-read-microbatch-continuous",
134+
"fake-read-neither-mode")
135+
val writeFormats = Seq(
136+
"fake-write-microbatch-only",
137+
"fake-write-continuous-only",
138+
"fake-write-microbatch-continuous",
139+
"fake-write-neither-mode")
140+
val triggers = Seq(
141+
Trigger.Once(),
142+
Trigger.ProcessingTime(1000),
143+
Trigger.Continuous(1000))
144+
145+
private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger) = {
146+
val query = spark.readStream
147+
.format(readFormat)
148+
.load()
149+
.writeStream
150+
.format(writeFormat)
151+
.trigger(trigger)
154152
.start()
155153
query.stop()
156154
}
157155

158-
test("microbatch with only continuous support") {
156+
private def testUnsupportedOperationCase(
157+
readFormat: String,
158+
writeFormat: String,
159+
trigger: Trigger,
160+
errorMsg: String) = {
159161
val ex = intercept[UnsupportedOperationException] {
160-
df.writeStream.format("fake-continuous-only").start()
162+
testPositiveCase(readFormat, writeFormat, trigger)
161163
}
162-
163-
assert(ex.getMessage.contains(
164-
"Data source fake-continuous-only does not support streamed writing"))
164+
assert(ex.getMessage.contains(errorMsg))
165165
}
166166

167-
test("microbatch with no support") {
168-
val ex = intercept[UnsupportedOperationException] {
169-
df.writeStream.format("fake-neither-mode").start()
167+
private def testLogicalPlanCase(
168+
readFormat: String,
169+
writeFormat: String,
170+
trigger: Trigger,
171+
errorMsg: String) = {
172+
val ex = intercept[StreamingQueryException] {
173+
spark.readStream
174+
.format(readFormat)
175+
.load()
176+
.writeStream
177+
.format(writeFormat)
178+
.trigger(trigger)
179+
.start()
180+
.processAllAvailable()
170181
}
171-
172-
assert(ex.getMessage.contains(
173-
"Data source fake-neither-mode does not support streamed writing"))
182+
assert(ex.cause != null)
183+
assert(ex.cause.getMessage.contains(errorMsg))
174184
}
175185

176-
test("continuous with only microbatch support") {
177-
val ex = intercept[AnalysisException] {
178-
df.writeStream
179-
.format("fake-microbatch-only")
180-
.trigger(Trigger.Continuous(100))
181-
.start()
186+
// Get a list of (read, write, trigger) tuples for test cases.
187+
val cases = readFormats.flatMap { read =>
188+
writeFormats.flatMap { write =>
189+
triggers.map(t => (write, t))
190+
}.map {
191+
case (write, t) => (read, write, t)
182192
}
183-
184-
assert(ex.getMessage.contains(
185-
"Data source fake-microbatch-only does not support continuous writing"))
186193
}
187194

188-
test("continuous with no support") {
189-
val ex = intercept[AnalysisException] {
190-
df.writeStream
191-
.format("fake-neither-mode")
192-
.trigger(Trigger.Continuous(100))
193-
.start()
195+
for ((read, write, trigger) <- cases) {
196+
testQuietly(s"stream with read format $read, write format $write, trigger $trigger") {
197+
val readSource = DataSource.lookupDataSource(read, spark.sqlContext.conf).newInstance()
198+
val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).newInstance()
199+
(readSource, writeSource, trigger) match {
200+
// Valid microbatch queries.
201+
case (_: MicroBatchReadSupport, _: MicroBatchWriteSupport, t)
202+
if !t.isInstanceOf[ContinuousTrigger] =>
203+
testPositiveCase(read, write, trigger)
204+
205+
// Valid continuous queries.
206+
case (_: ContinuousReadSupport, _: ContinuousWriteSupport, _: ContinuousTrigger) =>
207+
testPositiveCase(read, write, trigger)
208+
209+
// Invalid - can't read at all
210+
case (r, _, _)
211+
if !r.isInstanceOf[MicroBatchReadSupport]
212+
&& !r.isInstanceOf[ContinuousReadSupport] =>
213+
testUnsupportedOperationCase(read, write, trigger,
214+
s"Data source $read does not support streamed reading")
215+
216+
// Invalid - trigger is continuous but writer is not
217+
case (_, w, _: ContinuousTrigger) if !w.isInstanceOf[ContinuousWriteSupport] =>
218+
testUnsupportedOperationCase(read, write, trigger,
219+
s"Data source $write does not support continuous writing")
220+
221+
// Invalid - can't write at all
222+
case (_, w, _)
223+
if !w.isInstanceOf[MicroBatchWriteSupport]
224+
&& !w.isInstanceOf[ContinuousWriteSupport] =>
225+
testUnsupportedOperationCase(read, write, trigger,
226+
s"Data source $write does not support streamed writing")
227+
228+
// Invalid - trigger and writer are continuous but reader is not
229+
case (r, _: ContinuousWriteSupport, _: ContinuousTrigger)
230+
if !r.isInstanceOf[ContinuousReadSupport] =>
231+
testLogicalPlanCase(read, write, trigger,
232+
s"Data source $read does not support continuous processing")
233+
234+
// Invalid - trigger is microbatch but writer is not
235+
case (_, w, t)
236+
if !w.isInstanceOf[MicroBatchWriteSupport] && !t.isInstanceOf[ContinuousTrigger] =>
237+
testUnsupportedOperationCase(read, write, trigger,
238+
s"Data source $write does not support streamed writing")
239+
240+
// Invalid - trigger and writer are microbatch but reader is not
241+
case (r, _, t)
242+
if !r.isInstanceOf[MicroBatchReadSupport] && !t.isInstanceOf[ContinuousTrigger] =>
243+
testLogicalPlanCase(read, write, trigger,
244+
s"Data source $read does not support microbatch processing")
245+
}
194246
}
195-
196-
assert(ex.getMessage.contains(
197-
"Data source fake-neither-mode does not support continuous writing"))
198247
}
199248
}

0 commit comments

Comments
 (0)