@@ -5,6 +5,7 @@ use chrono::DateTime;
55use serde:: { Deserialize , Serialize } ;
66use std:: fmt;
77use std:: sync:: { LockResult , RwLockReadGuard , RwLockWriteGuard } ;
8+ use std:: time:: Duration ;
89use uuid:: Uuid ;
910
1011pub use chrono:: Utc ;
@@ -15,6 +16,8 @@ use tokio::sync::broadcast::{self, Receiver};
1516#[ macro_use]
1617extern crate lazy_static;
1718
19+ const DEAFAULT_CHANNEL_CLOSING_TIMEOUT : Duration = Duration :: from_secs ( 30 ) ;
20+
1821#[ derive( Clone , Debug , Deserialize , Serialize , PartialEq , Eq ) ]
1922#[ serde( transparent) ]
2023pub struct MicrosoftTelemetryKey ( Uuid ) ;
@@ -355,18 +358,22 @@ pub async fn set_appinsights_clients(
355358 global:: set_clients ( instance_client, microsoft_client) ;
356359}
357360
361+ pub async fn try_flush_and_close ( ) {
362+ _try_flush_and_close ( DEAFAULT_CHANNEL_CLOSING_TIMEOUT ) . await
363+ }
364+
358365/// Try to submit any pending telemetry with a blocking call.
359366///
360367/// Meant for a final attempt at flushing pending items before an abnormal exit.
361368/// After calling this function, any existing telemetry client will be dropped,
362369/// and subsequent telemetry submission will be a silent no-op.
363- pub async fn try_flush_and_close ( ) {
370+ pub async fn _try_flush_and_close ( timeout : Duration ) {
364371 let clients = global:: take_clients ( ) ;
365-
366372 for client in clients {
367- client. close_channel ( ) . await ;
373+ if let Err ( e) = tokio:: time:: timeout ( timeout, client. close_channel ( ) ) . await {
374+ log:: warn!( "Failed to close telemetry client: {}" , e) ;
375+ }
368376 }
369-
370377 // dropping the broadcast sender to make sure all pending events are sent
371378 let _global_event_source = global:: EVENT_SOURCE . write ( ) . unwrap ( ) . take ( ) ;
372379}
@@ -468,11 +475,15 @@ pub fn try_broadcast_trace(timestamp: DateTime<Utc>, msg: String, level: log::Le
468475}
469476
470477pub fn subscribe_to_events ( ) -> Result < Receiver < LoggingEvent > > {
471- let global_event_source = global:: EVENT_SOURCE . read ( ) . unwrap ( ) ;
472- if let Some ( evs) = global_event_source. clone ( ) {
473- Ok ( evs. subscribe ( ) )
474- } else {
475- bail ! ( "Event source not initialized" ) ;
478+ match global:: EVENT_SOURCE . read ( ) {
479+ Ok ( global_event_source) => {
480+ if let Some ( evs) = global_event_source. clone ( ) {
481+ Ok ( evs. subscribe ( ) )
482+ } else {
483+ bail ! ( "Event source not initialized" ) ;
484+ }
485+ }
486+ Err ( e) => bail ! ( "failed to acquire event source lock: {}" , e) ,
476487 }
477488}
478489
0 commit comments