@@ -22,9 +22,8 @@ import java.util.Locale
22
22
import org .apache .kafka .clients .producer .ProducerConfig
23
23
import org .apache .kafka .common .serialization .ByteArraySerializer
24
24
import org .scalatest .time .SpanSugar ._
25
- import scala .collection .JavaConverters ._
26
25
27
- import org .apache .spark .sql .{DataFrame , Row }
26
+ import org .apache .spark .sql .{AnalysisException , DataFrame , Row }
28
27
import org .apache .spark .sql .catalyst .expressions .{AttributeReference , SpecificInternalRow , UnsafeProjection }
29
28
import org .apache .spark .sql .streaming ._
30
29
import org .apache .spark .sql .types .{BinaryType , DataType }
@@ -227,39 +226,23 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
227
226
val topic = newTopic()
228
227
testUtils.createTopic(topic)
229
228
230
- /* No topic field or topic option */
231
- var writer : StreamingQuery = null
232
- var ex : Exception = null
233
- try {
234
- writer = createKafkaWriter(input.toDF())(
229
+ val ex = intercept[AnalysisException ] {
230
+ /* No topic field or topic option */
231
+ createKafkaWriter(input.toDF())(
235
232
withSelectExpr = " value as key" , " value"
236
233
)
237
- testUtils.sendMessages(inputTopic, Array (" 1" , " 2" , " 3" , " 4" , " 5" ))
238
- eventually(timeout(streamingTimeout)) {
239
- assert(writer.exception.isDefined)
240
- ex = writer.exception.get
241
- }
242
- } finally {
243
- writer.stop()
244
234
}
245
235
assert(ex.getMessage
246
236
.toLowerCase(Locale .ROOT )
247
237
.contains(" topic option required when no 'topic' attribute is present" ))
248
238
249
- try {
239
+ val ex2 = intercept[ AnalysisException ] {
250
240
/* No value field */
251
- writer = createKafkaWriter(input.toDF())(
241
+ createKafkaWriter(input.toDF())(
252
242
withSelectExpr = s " ' $topic' as topic " , " value as key"
253
243
)
254
- testUtils.sendMessages(inputTopic, Array (" 1" , " 2" , " 3" , " 4" , " 5" ))
255
- eventually(timeout(streamingTimeout)) {
256
- assert(writer.exception.isDefined)
257
- ex = writer.exception.get
258
- }
259
- } finally {
260
- writer.stop()
261
244
}
262
- assert(ex .getMessage.toLowerCase(Locale .ROOT ).contains(
245
+ assert(ex2 .getMessage.toLowerCase(Locale .ROOT ).contains(
263
246
" required attribute 'value' not found" ))
264
247
}
265
248
@@ -278,53 +261,30 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
278
261
val topic = newTopic()
279
262
testUtils.createTopic(topic)
280
263
281
- var writer : StreamingQuery = null
282
- var ex : Exception = null
283
- try {
264
+ val ex = intercept[AnalysisException ] {
284
265
/* topic field wrong type */
285
- writer = createKafkaWriter(input.toDF())(
266
+ createKafkaWriter(input.toDF())(
286
267
withSelectExpr = s " CAST('1' as INT) as topic " , " value"
287
268
)
288
- testUtils.sendMessages(inputTopic, Array (" 1" , " 2" , " 3" , " 4" , " 5" ))
289
- eventually(timeout(streamingTimeout)) {
290
- assert(writer.exception.isDefined)
291
- ex = writer.exception.get
292
- }
293
- } finally {
294
- writer.stop()
295
269
}
296
270
assert(ex.getMessage.toLowerCase(Locale .ROOT ).contains(" topic type must be a string" ))
297
271
298
- try {
272
+ val ex2 = intercept[ AnalysisException ] {
299
273
/* value field wrong type */
300
- writer = createKafkaWriter(input.toDF())(
274
+ createKafkaWriter(input.toDF())(
301
275
withSelectExpr = s " ' $topic' as topic " , " CAST(value as INT) as value"
302
276
)
303
- testUtils.sendMessages(inputTopic, Array (" 1" , " 2" , " 3" , " 4" , " 5" ))
304
- eventually(timeout(streamingTimeout)) {
305
- assert(writer.exception.isDefined)
306
- ex = writer.exception.get
307
- }
308
- } finally {
309
- writer.stop()
310
277
}
311
- assert(ex .getMessage.toLowerCase(Locale .ROOT ).contains(
278
+ assert(ex2 .getMessage.toLowerCase(Locale .ROOT ).contains(
312
279
" value attribute type must be a string or binary" ))
313
280
314
- try {
281
+ val ex3 = intercept[ AnalysisException ] {
315
282
/* key field wrong type */
316
- writer = createKafkaWriter(input.toDF())(
283
+ createKafkaWriter(input.toDF())(
317
284
withSelectExpr = s " ' $topic' as topic " , " CAST(value as INT) as key" , " value"
318
285
)
319
- testUtils.sendMessages(inputTopic, Array (" 1" , " 2" , " 3" , " 4" , " 5" ))
320
- eventually(timeout(streamingTimeout)) {
321
- assert(writer.exception.isDefined)
322
- ex = writer.exception.get
323
- }
324
- } finally {
325
- writer.stop()
326
286
}
327
- assert(ex .getMessage.toLowerCase(Locale .ROOT ).contains(
287
+ assert(ex3 .getMessage.toLowerCase(Locale .ROOT ).contains(
328
288
" key attribute type must be a string or binary" ))
329
289
}
330
290
@@ -369,35 +329,22 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
369
329
.option(" kafka.bootstrap.servers" , testUtils.brokerAddress)
370
330
.option(" subscribe" , inputTopic)
371
331
.load()
372
- var writer : StreamingQuery = null
373
- var ex : Exception = null
374
- try {
375
- writer = createKafkaWriter(
332
+
333
+ val ex = intercept[IllegalArgumentException ] {
334
+ createKafkaWriter(
376
335
input.toDF(),
377
336
withOptions = Map (" kafka.key.serializer" -> " foo" ))()
378
- eventually(timeout(streamingTimeout)) {
379
- assert(writer.exception.isDefined)
380
- ex = writer.exception.get
381
- }
382
- assert(ex.getMessage.toLowerCase(Locale .ROOT ).contains(
383
- " kafka option 'key.serializer' is not supported" ))
384
- } finally {
385
- writer.stop()
386
337
}
338
+ assert(ex.getMessage.toLowerCase(Locale .ROOT ).contains(
339
+ " kafka option 'key.serializer' is not supported" ))
387
340
388
- try {
389
- writer = createKafkaWriter(
341
+ val ex2 = intercept[ IllegalArgumentException ] {
342
+ createKafkaWriter(
390
343
input.toDF(),
391
344
withOptions = Map (" kafka.value.serializer" -> " foo" ))()
392
- eventually(timeout(streamingTimeout)) {
393
- assert(writer.exception.isDefined)
394
- ex = writer.exception.get
395
- }
396
- assert(ex.getMessage.toLowerCase(Locale .ROOT ).contains(
397
- " kafka option 'value.serializer' is not supported" ))
398
- } finally {
399
- writer.stop()
400
345
}
346
+ assert(ex2.getMessage.toLowerCase(Locale .ROOT ).contains(
347
+ " kafka option 'value.serializer' is not supported" ))
401
348
}
402
349
403
350
test(" generic - write big data with small producer buffer" ) {
0 commit comments