4343 } ,
4444 time:: Instant ,
4545 } ,
46+ tokio_util:: sync:: CancellationToken ,
4647} ;
4748
4849mod transaction {
@@ -56,6 +57,7 @@ pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 16;
5657
5758pub struct VoteWorker {
5859 exit : Arc < AtomicBool > ,
60+ shutdown_signal : CancellationToken ,
5961 decision_maker : DecisionMaker ,
6062 tpu_receiver : VotePacketReceiver ,
6163 gossip_receiver : VotePacketReceiver ,
@@ -67,6 +69,7 @@ pub struct VoteWorker {
6769impl VoteWorker {
6870 pub fn new (
6971 exit : Arc < AtomicBool > ,
72+ shutdown_signal : CancellationToken ,
7073 decision_maker : DecisionMaker ,
7174 tpu_receiver : VotePacketReceiver ,
7275 gossip_receiver : VotePacketReceiver ,
@@ -76,6 +79,7 @@ impl VoteWorker {
7679 ) -> Self {
7780 Self {
7881 exit,
82+ shutdown_signal,
7983 decision_maker,
8084 tpu_receiver,
8185 gossip_receiver,
@@ -110,7 +114,11 @@ impl VoteWorker {
110114 VoteSource :: Tpu ,
111115 ) {
112116 Ok ( ( ) ) | Err ( RecvTimeoutError :: Timeout ) => ( ) ,
113- Err ( RecvTimeoutError :: Disconnected ) => break ,
117+ Err ( RecvTimeoutError :: Disconnected ) => {
118+ self . shutdown_signal . cancel ( ) ;
119+
120+ break ;
121+ }
114122 }
115123 // Check for new packets from the gossip receiver
116124 match self . gossip_receiver . receive_and_buffer_packets (
@@ -120,7 +128,11 @@ impl VoteWorker {
120128 VoteSource :: Gossip ,
121129 ) {
122130 Ok ( ( ) ) | Err ( RecvTimeoutError :: Timeout ) => ( ) ,
123- Err ( RecvTimeoutError :: Disconnected ) => break ,
131+ Err ( RecvTimeoutError :: Disconnected ) => {
132+ self . shutdown_signal . cancel ( ) ;
133+
134+ break ;
135+ }
124136 }
125137 banking_stage_stats. report ( 1000 ) ;
126138 }
0 commit comments