@@ -11,10 +11,12 @@ use smallvec::SmallVec;
1111use tokio_util:: time:: DelayQueue ;
1212use types:: EthSpec ;
1313
14+ use crate :: service:: BehaviourEvent ;
15+
1416use super :: {
1517 config:: OutboundRateLimiterConfig ,
1618 rate_limiter:: { RPCRateLimiter as RateLimiter , RateLimitedErr } ,
17- BehaviourAction , Protocol , RPCSend , ReqId , RequestType ,
19+ BehaviourAction , Protocol , RPCMessage , RPCSend , ReqId , RequestType ,
1820} ;
1921
2022/// A request that was rate limited or waiting on rate limited requests for the same peer and
@@ -34,7 +36,7 @@ pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
3436 /// Rate limiter for our own requests.
3537 limiter : RateLimiter ,
3638 /// Requests that are ready to be sent.
37- ready_requests : SmallVec < [ BehaviourAction < Id , E > ; 3 ] > ,
39+ ready_requests : SmallVec < [ ( PeerId , RPCSend < Id , E > ) ; 3 ] > ,
3840 /// Slog logger.
3941 log : Logger ,
4042}
@@ -71,7 +73,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
7173 peer_id : PeerId ,
7274 request_id : Id ,
7375 req : RequestType < E > ,
74- ) -> Result < BehaviourAction < Id , E > , Error > {
76+ ) -> Result < RPCSend < Id , E > , Error > {
7577 let protocol = req. versioned_protocol ( ) . protocol ( ) ;
7678 // First check that there are not already other requests waiting to be sent.
7779 if let Some ( queued_requests) = self . delayed_requests . get_mut ( & ( peer_id, protocol) ) {
@@ -103,13 +105,9 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
103105 request_id : Id ,
104106 req : RequestType < E > ,
105107 log : & Logger ,
106- ) -> Result < BehaviourAction < Id , E > , ( QueuedRequest < Id , E > , Duration ) > {
108+ ) -> Result < RPCSend < Id , E > , ( QueuedRequest < Id , E > , Duration ) > {
107109 match limiter. allows ( & peer_id, & req) {
108- Ok ( ( ) ) => Ok ( BehaviourAction :: NotifyHandler {
109- peer_id,
110- handler : NotifyHandler :: Any ,
111- event : RPCSend :: Request ( request_id, req) ,
112- } ) ,
110+ Ok ( ( ) ) => Ok ( RPCSend :: Request ( request_id, req) ) ,
113111 Err ( e) => {
114112 let protocol = req. versioned_protocol ( ) ;
115113 match e {
@@ -121,11 +119,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
121119 "Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters." ;
122120 "protocol" => %req. versioned_protocol( ) . protocol( )
123121 ) ;
124- Ok ( BehaviourAction :: NotifyHandler {
125- peer_id,
126- handler : NotifyHandler :: Any ,
127- event : RPCSend :: Request ( request_id, req) ,
128- } )
122+ Ok ( RPCSend :: Request ( request_id, req) )
129123 }
130124 RateLimitedErr :: TooSoon ( wait_time) => {
131125 debug ! ( log, "Self rate limiting" ; "protocol" => %protocol. protocol( ) , "wait_time_ms" => wait_time. as_millis( ) , "peer_id" => %peer_id) ;
@@ -151,7 +145,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
151145 // If one fails just wait for the next window that allows sending requests.
152146 return ;
153147 }
154- Ok ( event) => self . ready_requests . push ( event) ,
148+ Ok ( event) => self . ready_requests . push ( ( peer_id , event) ) ,
155149 }
156150 }
157151 if queued_requests. is_empty ( ) {
@@ -198,8 +192,12 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
198192 let _ = self . limiter . poll_unpin ( cx) ;
199193
200194 // Finally return any queued events.
201- if !self . ready_requests . is_empty ( ) {
202- return Poll :: Ready ( self . ready_requests . remove ( 0 ) ) ;
195+ if let Some ( ( peer_id, event) ) = self . ready_requests . pop ( ) {
196+ return Poll :: Ready ( BehaviourAction :: NotifyHandler {
197+ peer_id,
198+ handler : NotifyHandler :: Any ,
199+ event,
200+ } ) ;
203201 }
204202
205203 Poll :: Pending
0 commit comments