@@ -253,10 +253,11 @@ describe('Plugin', () => {
253253
254254 beforeEach ( async ( ) => {
255255 tracer = require ( '../../dd-trace' )
256- await agent . load ( '@confluentinc/kafka-javascript' )
257256 const lib = require ( `../../../versions/${ module } @${ version } ` ) . get ( )
258257 nativeApi = lib
259258
259+ await agent . load ( '@confluentinc/kafka-javascript' )
260+
260261 // Get the producer/consumer classes directly from the module
261262 Producer = nativeApi . Producer
262263 Consumer = nativeApi . KafkaConsumer
@@ -266,16 +267,24 @@ describe('Plugin', () => {
266267 dr_cb : true
267268 } )
268269
269- nativeProducer . connect ( )
270-
271- await new Promise ( resolve => {
272- nativeProducer . on ( 'ready' , resolve )
270+ await new Promise ( ( resolve , reject ) => {
271+ nativeProducer . connect ( { } , ( err ) => {
272+ if ( err ) {
273+ return reject ( err )
274+ }
275+ resolve ( )
276+ } )
273277 } )
274278 } )
275279
276280 afterEach ( async ( ) => {
277- await new Promise ( resolve => {
278- nativeProducer . disconnect ( resolve )
281+ await new Promise ( ( resolve , reject ) => {
282+ nativeProducer . disconnect ( ( err ) => {
283+ if ( err ) {
284+ return reject ( err )
285+ }
286+ resolve ( )
287+ } )
279288 } )
280289 } )
281290
@@ -332,23 +341,69 @@ describe('Plugin', () => {
332341 } )
333342
334343 describe ( 'consumer' , ( ) => {
335- beforeEach ( ( ) => {
344+ beforeEach ( async ( ) => {
336345 nativeConsumer = new Consumer ( {
337346 'bootstrap.servers' : '127.0.0.1:9092' ,
338- 'group.id' : 'test-group-native '
347+ 'group.id' : 'test-group'
339348 } )
340349
341- nativeConsumer . on ( 'ready' , ( ) => {
342- nativeConsumer . subscribe ( [ testTopic ] )
350+ await new Promise ( ( resolve , reject ) => {
351+ nativeConsumer . connect ( { } , ( err ) => {
352+ if ( err ) {
353+ return reject ( err )
354+ }
355+ resolve ( )
356+ } )
343357 } )
344-
345- nativeConsumer . connect ( )
346358 } )
347359
348- afterEach ( ( ) => {
349- nativeConsumer . disconnect ( )
360+ afterEach ( async ( ) => {
361+ await nativeConsumer . unsubscribe ( )
362+ await new Promise ( ( resolve , reject ) => {
363+ nativeConsumer . disconnect ( ( err ) => {
364+ if ( err ) {
365+ return reject ( err )
366+ }
367+ resolve ( )
368+ } )
369+ } )
350370 } )
351371
372+ function consume ( consumer , producer , topic , message , timeoutMs = 9500 ) {
373+ return new Promise ( ( resolve , reject ) => {
374+ const timeoutId = setTimeout ( ( ) => {
375+ reject ( new Error ( `Timeout: Did not consume message on topic "${ topic } " within ${ timeoutMs } ms` ) )
376+ } , timeoutMs )
377+
378+ function doConsume ( ) {
379+ consumer . consume ( 1 , function ( err , messages ) {
380+ if ( err ) {
381+ clearTimeout ( timeoutId )
382+ return reject ( err )
383+ }
384+
385+ if ( ! messages || messages . length === 0 ) {
386+ setTimeout ( doConsume , 20 )
387+ return
388+ }
389+
390+ const consumedMessage = messages [ 0 ]
391+
392+ if ( consumedMessage . value . toString ( ) !== message . toString ( ) ) {
393+ setTimeout ( doConsume , 20 )
394+ return
395+ }
396+
397+ clearTimeout ( timeoutId )
398+ consumer . unsubscribe ( )
399+ resolve ( )
400+ } )
401+ }
402+ doConsume ( )
403+ producer . produce ( topic , null , message , 'native-consumer-key' )
404+ } )
405+ }
406+
352407 it ( 'should be instrumented' , async ( ) => {
353408 const expectedSpanPromise = expectSpanWithDefaults ( {
354409 name : expectedSchema . receive . opName ,
@@ -363,22 +418,13 @@ describe('Plugin', () => {
363418 type : 'worker'
364419 } )
365420
421+ nativeConsumer . setDefaultConsumeTimeout ( 10 )
422+ nativeConsumer . subscribe ( [ testTopic ] )
423+
366424 // Send a test message using the producer
367425 const message = Buffer . from ( 'test message for native consumer' )
368- const key = 'native-consumer-key'
369-
370- let consumePromise
371- nativeConsumer . on ( 'ready' , ( ) => {
372- // Consume messages
373- consumePromise = new Promise ( resolve => {
374- nativeConsumer . consume ( 1 , ( err , messages ) => {
375- resolve ( )
376- } )
377- nativeProducer . produce ( testTopic , null , message , key )
378- } )
379- } )
380426
381- await consumePromise
427+ await consume ( nativeConsumer , nativeProducer , testTopic , message )
382428
383429 return expectedSpanPromise
384430 } )
@@ -395,23 +441,13 @@ describe('Plugin', () => {
395441
396442 expect ( parseInt ( span . parent_id . toString ( ) ) ) . to . be . gt ( 0 )
397443 } , { timeoutMs : 10000 } )
444+ nativeConsumer . setDefaultConsumeTimeout ( 10 )
445+ nativeConsumer . subscribe ( [ testTopic ] )
398446
399447 // Send a test message using the producer
400- const message = Buffer . from ( 'test message for native consumer' )
401- const key = 'native-consumer-key'
402-
403- let consumePromise
404- nativeConsumer . on ( 'ready' , ( ) => {
405- // Consume messages
406- consumePromise = new Promise ( resolve => {
407- nativeConsumer . consume ( 1 , ( err , messages ) => {
408- resolve ( )
409- } )
410- nativeProducer . produce ( testTopic , null , message , key )
411- } )
412- } )
448+ const message = Buffer . from ( 'test message propagation for native consumer 1' )
413449
414- await consumePromise
450+ await consume ( nativeConsumer , nativeProducer , testTopic , message )
415451
416452 return expectedSpanPromise
417453 } )
0 commit comments