@@ -550,6 +550,26 @@ def make_running_manager(**kwargs):
550550 )
551551
552552
553+ def await_manager_shutdown (timeout = None ): # pragma: NO COVER
554+ # NOTE: This method should be called after manager.close(), i.e. after the shutdown
555+ # thread has been started.
556+ shutdown_thread = next (
557+ (
558+ thread
559+ for thread in threading .enumerate ()
560+ if thread .name == streaming_pull_manager ._REGULAR_SHUTDOWN_THREAD_NAME
561+ ),
562+ None ,
563+ )
564+
565+ if shutdown_thread is None :
566+ return # Shutdown already finished.
567+
568+ shutdown_thread .join (timeout = timeout )
569+ if shutdown_thread .is_alive ():
570+ pytest .fail ("Shutdown not completed in time." )
571+
572+
553573def test_close ():
554574 (
555575 manager ,
@@ -561,6 +581,7 @@ def test_close():
561581 ) = make_running_manager ()
562582
563583 manager .close ()
584+ await_manager_shutdown (timeout = 3 )
564585
565586 consumer .stop .assert_called_once ()
566587 leaser .stop .assert_called_once ()
@@ -583,6 +604,7 @@ def test_close_inactive_consumer():
583604 consumer .is_active = False
584605
585606 manager .close ()
607+ await_manager_shutdown (timeout = 3 )
586608
587609 consumer .stop .assert_not_called ()
588610 leaser .stop .assert_called_once ()
@@ -596,6 +618,7 @@ def test_close_idempotent():
596618
597619 manager .close ()
598620 manager .close ()
621+ await_manager_shutdown (timeout = 3 )
599622
600623 assert scheduler .shutdown .call_count == 1
601624
@@ -640,6 +663,7 @@ def test_close_no_dispatcher_error():
640663 dispatcher .start ()
641664
642665 manager .close ()
666+ await_manager_shutdown (timeout = 3 )
643667
644668 error_callback .assert_not_called ()
645669
@@ -651,6 +675,7 @@ def test_close_callbacks():
651675
652676 manager .add_close_callback (callback )
653677 manager .close (reason = "meep" )
678+ await_manager_shutdown (timeout = 3 )
654679
655680 callback .assert_called_once_with (manager , "meep" )
656681
@@ -660,6 +685,7 @@ def test_close_blocking_scheduler_shutdown():
660685 scheduler = manager ._scheduler
661686
662687 manager .close ()
688+ await_manager_shutdown (timeout = 3 )
663689
664690 scheduler .shutdown .assert_called_once_with (await_msg_callbacks = True )
665691
@@ -669,6 +695,7 @@ def test_close_nonblocking_scheduler_shutdown():
669695 scheduler = manager ._scheduler
670696
671697 manager .close ()
698+ await_manager_shutdown (timeout = 3 )
672699
673700 scheduler .shutdown .assert_called_once_with (await_msg_callbacks = False )
674701
@@ -690,6 +717,7 @@ def fake_nack(self):
690717 manager ._messages_on_hold ._messages_on_hold .append (messages [2 ])
691718
692719 manager .close ()
720+ await_manager_shutdown (timeout = 3 )
693721
694722 assert sorted (nacked_messages ) == [b"msg1" , b"msg2" , b"msg3" ]
695723
0 commit comments