@@ -25,6 +25,8 @@ import org.scalatest.time.SpanSugar._
25
25
26
26
import org .apache .spark .sql .{AnalysisException , DataFrame , Row }
27
27
import org .apache .spark .sql .catalyst .expressions .{AttributeReference , SpecificInternalRow , UnsafeProjection }
28
+ import org .apache .spark .sql .execution .streaming .MemoryStream
29
+ import org .apache .spark .sql .execution .streaming .sources .ContinuousMemoryStream
28
30
import org .apache .spark .sql .streaming ._
29
31
import org .apache .spark .sql .types .{BinaryType , DataType }
30
32
import org .apache .spark .util .Utils
@@ -215,6 +217,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
215
217
test(" streaming - write data with bad schema" ) {
216
218
val inputTopic = newTopic()
217
219
testUtils.createTopic(inputTopic, partitions = 1 )
220
+ testUtils.sendMessages(inputTopic, Array (" 0" ))
218
221
219
222
val input = spark
220
223
.readStream
@@ -226,21 +229,21 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
226
229
val topic = newTopic()
227
230
testUtils.createTopic(topic)
228
231
229
- val ex = intercept[AnalysisException ] {
232
+ val ex = intercept[StreamingQueryException ] {
230
233
/* No topic field or topic option */
231
234
createKafkaWriter(input.toDF())(
232
235
withSelectExpr = " value as key" , " value"
233
- )
236
+ ).processAllAvailable()
234
237
}
235
238
assert(ex.getMessage
236
239
.toLowerCase(Locale .ROOT )
237
240
.contains(" topic option required when no 'topic' attribute is present" ))
238
241
239
- val ex2 = intercept[AnalysisException ] {
242
+ val ex2 = intercept[StreamingQueryException ] {
240
243
/* No value field */
241
244
createKafkaWriter(input.toDF())(
242
245
withSelectExpr = s " ' $topic' as topic " , " value as key"
243
- )
246
+ ).processAllAvailable()
244
247
}
245
248
assert(ex2.getMessage.toLowerCase(Locale .ROOT ).contains(
246
249
" required attribute 'value' not found" ))
@@ -249,6 +252,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
249
252
test(" streaming - write data with valid schema but wrong types" ) {
250
253
val inputTopic = newTopic()
251
254
testUtils.createTopic(inputTopic, partitions = 1 )
255
+ testUtils.sendMessages(inputTopic, Array (" 0" ))
252
256
253
257
val input = spark
254
258
.readStream
@@ -261,28 +265,28 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
261
265
val topic = newTopic()
262
266
testUtils.createTopic(topic)
263
267
264
- val ex = intercept[AnalysisException ] {
268
+ val ex = intercept[StreamingQueryException ] {
265
269
/* topic field wrong type */
266
270
createKafkaWriter(input.toDF())(
267
271
withSelectExpr = s " CAST('1' as INT) as topic " , " value"
268
- )
272
+ ).processAllAvailable()
269
273
}
270
274
assert(ex.getMessage.toLowerCase(Locale .ROOT ).contains(" topic type must be a string" ))
271
275
272
- val ex2 = intercept[AnalysisException ] {
276
+ val ex2 = intercept[StreamingQueryException ] {
273
277
/* value field wrong type */
274
278
createKafkaWriter(input.toDF())(
275
279
withSelectExpr = s " ' $topic' as topic " , " CAST(value as INT) as value"
276
- )
280
+ ).processAllAvailable()
277
281
}
278
282
assert(ex2.getMessage.toLowerCase(Locale .ROOT ).contains(
279
283
" value attribute type must be a string or binary" ))
280
284
281
- val ex3 = intercept[AnalysisException ] {
285
+ val ex3 = intercept[StreamingQueryException ] {
282
286
/* key field wrong type */
283
287
createKafkaWriter(input.toDF())(
284
288
withSelectExpr = s " ' $topic' as topic " , " CAST(value as INT) as key" , " value"
285
- )
289
+ ).processAllAvailable()
286
290
}
287
291
assert(ex3.getMessage.toLowerCase(Locale .ROOT ).contains(
288
292
" key attribute type must be a string or binary" ))
@@ -330,18 +334,18 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
330
334
.option(" subscribe" , inputTopic)
331
335
.load()
332
336
333
- val ex = intercept[IllegalArgumentException ] {
337
+ val ex = intercept[StreamingQueryException ] {
334
338
createKafkaWriter(
335
339
input.toDF(),
336
- withOptions = Map (" kafka.key.serializer" -> " foo" ))()
340
+ withOptions = Map (" kafka.key.serializer" -> " foo" ))().processAllAvailable()
337
341
}
338
342
assert(ex.getMessage.toLowerCase(Locale .ROOT ).contains(
339
343
" kafka option 'key.serializer' is not supported" ))
340
344
341
- val ex2 = intercept[IllegalArgumentException ] {
345
+ val ex2 = intercept[StreamingQueryException ] {
342
346
createKafkaWriter(
343
347
input.toDF(),
344
- withOptions = Map (" kafka.value.serializer" -> " foo" ))()
348
+ withOptions = Map (" kafka.value.serializer" -> " foo" ))().processAllAvailable()
345
349
}
346
350
assert(ex2.getMessage.toLowerCase(Locale .ROOT ).contains(
347
351
" kafka option 'value.serializer' is not supported" ))
0 commit comments