4848 fmt:: Debug ,
4949 net:: SocketAddr ,
5050 sync:: Arc ,
51+ time:: Duration ,
5152 } ,
5253 tokio:: sync:: mpsc,
5354 tracing:: instrument,
@@ -115,6 +116,7 @@ async fn handle_connection<S>(
115116 state : Arc < S > ,
116117 notify_price_tx_buffer : usize ,
117118 notify_price_sched_tx_buffer : usize ,
119+ flush_interval_duration : Duration ,
118120) where
119121 S : state:: Prices ,
120122 S : Send ,
@@ -127,6 +129,8 @@ async fn handle_connection<S>(
127129 let ( mut notify_price_sched_tx, mut notify_price_sched_rx) =
128130 mpsc:: channel ( notify_price_sched_tx_buffer) ;
129131
132+ let mut flush_interval = tokio:: time:: interval ( flush_interval_duration) ;
133+
130134 loop {
131135 if let Err ( err) = handle_next (
132136 & * state,
@@ -136,6 +140,7 @@ async fn handle_connection<S>(
136140 & mut notify_price_rx,
137141 & mut notify_price_sched_tx,
138142 & mut notify_price_sched_rx,
143+ & mut flush_interval,
139144 )
140145 . await
141146 {
@@ -159,6 +164,7 @@ async fn handle_next<S>(
159164 notify_price_rx : & mut mpsc:: Receiver < NotifyPrice > ,
160165 notify_price_sched_tx : & mut mpsc:: Sender < NotifyPriceSched > ,
161166 notify_price_sched_rx : & mut mpsc:: Receiver < NotifyPriceSched > ,
167+ notify_flush_interval : & mut tokio:: time:: Interval ,
162168) -> Result < ( ) >
163169where
164170 S : state:: Prices ,
@@ -183,13 +189,16 @@ where
183189 }
184190 }
185191 Some ( notify_price) = notify_price_rx. recv( ) => {
186- send_notification ( ws_tx, Method :: NotifyPrice , Some ( notify_price) )
192+ feed_notification ( ws_tx, Method :: NotifyPrice , Some ( notify_price) )
187193 . await
188194 }
189195 Some ( notify_price_sched) = notify_price_sched_rx. recv( ) => {
190- send_notification ( ws_tx, Method :: NotifyPriceSched , Some ( notify_price_sched) )
196+ feed_notification ( ws_tx, Method :: NotifyPriceSched , Some ( notify_price_sched) )
191197 . await
192198 }
199+ _ = notify_flush_interval. tick( ) => {
200+ flush( ws_tx) . await
201+ }
193202 }
194203}
195204
@@ -229,9 +238,9 @@ where
229238 // Send an array if we're handling a batch
230239 // request, single response object otherwise
231240 if is_batch {
232- send_text ( ws_tx, & serde_json:: to_string ( & responses) ?) . await ?;
241+ feed_text ( ws_tx, & serde_json:: to_string ( & responses) ?) . await ?;
233242 } else {
234- send_text ( ws_tx, & serde_json:: to_string ( & responses[ 0 ] ) ?) . await ?;
243+ feed_text ( ws_tx, & serde_json:: to_string ( & responses[ 0 ] ) ?) . await ?;
235244 }
236245 }
237246 // The top-level parsing errors are fine to share with client
@@ -354,21 +363,21 @@ async fn send_error(
354363 error. to_string ( ) ,
355364 None ,
356365 ) ;
357- send_text ( ws_tx, & response. to_string ( ) ) . await
366+ feed_text ( ws_tx, & response. to_string ( ) ) . await
358367}
359368
360- async fn send_notification < T > (
369+ async fn feed_notification < T > (
361370 ws_tx : & mut SplitSink < WebSocket , Message > ,
362371 method : Method ,
363372 params : Option < T > ,
364373) -> Result < ( ) >
365374where
366375 T : Sized + Serialize + DeserializeOwned ,
367376{
368- send_request ( ws_tx, IdReq :: Notification , method, params) . await
377+ feed_request ( ws_tx, IdReq :: Notification , method, params) . await
369378}
370379
371- async fn send_request < I , T > (
380+ async fn feed_request < I , T > (
372381 ws_tx : & mut SplitSink < WebSocket , Message > ,
373382 id : I ,
374383 method : Method ,
@@ -379,16 +388,20 @@ where
379388 T : Sized + Serialize + DeserializeOwned ,
380389{
381390 let request = Request :: with_params ( id, method, params) ;
382- send_text ( ws_tx, & request. to_string ( ) ) . await
391+ feed_text ( ws_tx, & request. to_string ( ) ) . await
383392}
384393
385- async fn send_text ( ws_tx : & mut SplitSink < WebSocket , Message > , msg : & str ) -> Result < ( ) > {
394+ async fn feed_text ( ws_tx : & mut SplitSink < WebSocket , Message > , msg : & str ) -> Result < ( ) > {
386395 ws_tx
387- . send ( Message :: text ( msg. to_string ( ) ) )
396+ . feed ( Message :: text ( msg. to_string ( ) ) )
388397 . await
389398 . map_err ( |e| e. into ( ) )
390399}
391400
401+ async fn flush ( ws_tx : & mut SplitSink < WebSocket , Message > ) -> Result < ( ) > {
402+ ws_tx. flush ( ) . await . map_err ( |e| e. into ( ) )
403+ }
404+
392405#[ derive( Clone , Debug , Serialize , Deserialize ) ]
393406#[ serde( default ) ]
394407pub struct Config {
@@ -400,6 +413,9 @@ pub struct Config {
400413 /// Size of the buffer of each Server's channel on which `notify_price_sched` events are
401414 /// received from the Price state.
402415 pub notify_price_sched_tx_buffer : usize ,
416+ /// Flush interval duration for the notifications.
417+ #[ serde( with = "humantime_serde" ) ]
418+ pub flush_interval_duration : Duration ,
403419}
404420
405421impl Default for Config {
@@ -408,6 +424,7 @@ impl Default for Config {
408424 listen_address : "127.0.0.1:8910" . to_string ( ) ,
409425 notify_price_tx_buffer : 10000 ,
410426 notify_price_sched_tx_buffer : 10000 ,
427+ flush_interval_duration : Duration :: from_millis ( 50 ) ,
411428 }
412429 }
413430}
@@ -448,6 +465,7 @@ where
448465 state,
449466 config. notify_price_tx_buffer ,
450467 config. notify_price_sched_tx_buffer ,
468+ config. flush_interval_duration ,
451469 )
452470 . await
453471 } )
0 commit comments