4848 fmt:: Debug ,
4949 net:: SocketAddr ,
5050 sync:: Arc ,
51+ time:: Duration ,
5152 } ,
5253 tokio:: sync:: mpsc,
5354 tracing:: instrument,
@@ -115,6 +116,7 @@ async fn handle_connection<S>(
115116 state : Arc < S > ,
116117 notify_price_tx_buffer : usize ,
117118 notify_price_sched_tx_buffer : usize ,
119+ notify_flush_interval_duration : Duration ,
118120) where
119121 S : state:: Prices ,
120122 S : Send ,
@@ -127,6 +129,8 @@ async fn handle_connection<S>(
127129 let ( mut notify_price_sched_tx, mut notify_price_sched_rx) =
128130 mpsc:: channel ( notify_price_sched_tx_buffer) ;
129131
132+ let mut notify_flush_interval = tokio:: time:: interval ( notify_flush_interval_duration) ;
133+
130134 loop {
131135 if let Err ( err) = handle_next (
132136 & * state,
@@ -136,6 +140,7 @@ async fn handle_connection<S>(
136140 & mut notify_price_rx,
137141 & mut notify_price_sched_tx,
138142 & mut notify_price_sched_rx,
143+ & mut notify_flush_interval,
139144 )
140145 . await
141146 {
@@ -159,6 +164,7 @@ async fn handle_next<S>(
159164 notify_price_rx : & mut mpsc:: Receiver < NotifyPrice > ,
160165 notify_price_sched_tx : & mut mpsc:: Sender < NotifyPriceSched > ,
161166 notify_price_sched_rx : & mut mpsc:: Receiver < NotifyPriceSched > ,
167+ notify_flush_interval : & mut tokio:: time:: Interval ,
162168) -> Result < ( ) >
163169where
164170 S : state:: Prices ,
@@ -183,13 +189,16 @@ where
183189 }
184190 }
185191 Some ( notify_price) = notify_price_rx. recv( ) => {
186- send_notification ( ws_tx, Method :: NotifyPrice , Some ( notify_price) )
192+ feed_notification ( ws_tx, Method :: NotifyPrice , Some ( notify_price) )
187193 . await
188194 }
189195 Some ( notify_price_sched) = notify_price_sched_rx. recv( ) => {
190- send_notification ( ws_tx, Method :: NotifyPriceSched , Some ( notify_price_sched) )
196+ feed_notification ( ws_tx, Method :: NotifyPriceSched , Some ( notify_price_sched) )
191197 . await
192198 }
199+ _ = notify_flush_interval. tick( ) => {
200+ flush( ws_tx) . await
201+ }
193202 }
194203}
195204
@@ -357,18 +366,18 @@ async fn send_error(
357366 send_text ( ws_tx, & response. to_string ( ) ) . await
358367}
359368
360- async fn send_notification < T > (
369+ async fn feed_notification < T > (
361370 ws_tx : & mut SplitSink < WebSocket , Message > ,
362371 method : Method ,
363372 params : Option < T > ,
364373) -> Result < ( ) >
365374where
366375 T : Sized + Serialize + DeserializeOwned ,
367376{
368- send_request ( ws_tx, IdReq :: Notification , method, params) . await
377+ feed_request ( ws_tx, IdReq :: Notification , method, params) . await
369378}
370379
371- async fn send_request < I , T > (
380+ async fn feed_request < I , T > (
372381 ws_tx : & mut SplitSink < WebSocket , Message > ,
373382 id : I ,
374383 method : Method ,
@@ -379,7 +388,14 @@ where
379388 T : Sized + Serialize + DeserializeOwned ,
380389{
381390 let request = Request :: with_params ( id, method, params) ;
382- send_text ( ws_tx, & request. to_string ( ) ) . await
391+ feed_text ( ws_tx, & request. to_string ( ) ) . await
392+ }
393+
394+ async fn feed_text ( ws_tx : & mut SplitSink < WebSocket , Message > , msg : & str ) -> Result < ( ) > {
395+ ws_tx
396+ . feed ( Message :: text ( msg. to_string ( ) ) )
397+ . await
398+ . map_err ( |e| e. into ( ) )
383399}
384400
385401async fn send_text ( ws_tx : & mut SplitSink < WebSocket , Message > , msg : & str ) -> Result < ( ) > {
@@ -389,25 +405,33 @@ async fn send_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Resu
389405 . map_err ( |e| e. into ( ) )
390406}
391407
408+ async fn flush ( ws_tx : & mut SplitSink < WebSocket , Message > ) -> Result < ( ) > {
409+ ws_tx. flush ( ) . await . map_err ( |e| e. into ( ) )
410+ }
411+
392412#[ derive( Clone , Debug , Serialize , Deserialize ) ]
393413#[ serde( default ) ]
394414pub struct Config {
395415 /// The address which the websocket API server will listen on.
396- pub listen_address : String ,
416+ pub listen_address : String ,
397417 /// Size of the buffer of each Server's channel on which `notify_price` events are
398418 /// received from the Price state.
399- pub notify_price_tx_buffer : usize ,
419+ pub notify_price_tx_buffer : usize ,
400420 /// Size of the buffer of each Server's channel on which `notify_price_sched` events are
401421 /// received from the Price state.
402- pub notify_price_sched_tx_buffer : usize ,
422+ pub notify_price_sched_tx_buffer : usize ,
423+ /// Flush interval duration for the notifications.
424+ #[ serde( with = "humantime_serde" ) ]
425+ pub notify_flush_interval_duration : Duration ,
403426}
404427
405428impl Default for Config {
406429 fn default ( ) -> Self {
407430 Self {
408- listen_address : "127.0.0.1:8910" . to_string ( ) ,
409- notify_price_tx_buffer : 10000 ,
410- notify_price_sched_tx_buffer : 10000 ,
431+ listen_address : "127.0.0.1:8910" . to_string ( ) ,
432+ notify_price_tx_buffer : 10000 ,
433+ notify_price_sched_tx_buffer : 10000 ,
434+ notify_flush_interval_duration : Duration :: from_millis ( 50 ) ,
411435 }
412436 }
413437}
@@ -448,6 +472,7 @@ where
448472 state,
449473 config. notify_price_tx_buffer ,
450474 config. notify_price_sched_tx_buffer ,
475+ config. notify_flush_interval_duration ,
451476 )
452477 . await
453478 } )
0 commit comments