Skip to content
This repository was archived by the owner on Nov 1, 2023. It is now read-only.

Commit 85f8065

Browse files
committed
Adding a time out when closing the app insight channels
1 parent 300a3e2 commit 85f8065

File tree

1 file changed

+21
-10
lines changed
  • src/agent/onefuzz-telemetry/src

1 file changed

+21
-10
lines changed

src/agent/onefuzz-telemetry/src/lib.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
use chrono::DateTime;
55
use serde::{Deserialize, Serialize};
6-
use std::fmt;
76
use std::sync::{LockResult, RwLockReadGuard, RwLockWriteGuard};
7+
use std::time::Duration;
8+
use std::fmt;
89
use uuid::Uuid;
910

1011
pub use chrono::Utc;
@@ -15,6 +16,8 @@ use tokio::sync::broadcast::{self, Receiver};
1516
#[macro_use]
1617
extern crate lazy_static;
1718

19+
const DEAFAULT_CHANNEL_CLOSING_TIMEOUT: Duration = Duration::from_secs(30);
20+
1821
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
1922
#[serde(transparent)]
2023
pub struct MicrosoftTelemetryKey(Uuid);
@@ -355,18 +358,22 @@ pub async fn set_appinsights_clients(
355358
global::set_clients(instance_client, microsoft_client);
356359
}
357360

361+
pub async fn try_flush_and_close() {
362+
_try_flush_and_close(DEAFAULT_CHANNEL_CLOSING_TIMEOUT).await
363+
}
364+
358365
/// Try to submit any pending telemetry with a blocking call.
359366
///
360367
/// Meant for a final attempt at flushing pending items before an abnormal exit.
361368
/// After calling this function, any existing telemetry client will be dropped,
362369
/// and subsequent telemetry submission will be a silent no-op.
363-
pub async fn try_flush_and_close() {
370+
pub async fn _try_flush_and_close(timeout: Duration) {
364371
let clients = global::take_clients();
365-
366372
for client in clients {
367-
client.close_channel().await;
373+
if let Err(e) = tokio::time::timeout(timeout, client.close_channel()).await {
374+
log::warn!("Failed to close telemetry client: {}", e);
375+
}
368376
}
369-
370377
// dropping the broadcast sender to make sure all pending events are sent
371378
let _global_event_source = global::EVENT_SOURCE.write().unwrap().take();
372379
}
@@ -468,11 +475,15 @@ pub fn try_broadcast_trace(timestamp: DateTime<Utc>, msg: String, level: log::Le
468475
}
469476

470477
pub fn subscribe_to_events() -> Result<Receiver<LoggingEvent>> {
471-
let global_event_source = global::EVENT_SOURCE.read().unwrap();
472-
if let Some(evs) = global_event_source.clone() {
473-
Ok(evs.subscribe())
474-
} else {
475-
bail!("Event source not initialized");
478+
match global::EVENT_SOURCE.read() {
479+
Ok(global_event_source) => {
480+
if let Some(evs) = global_event_source.clone() {
481+
Ok(evs.subscribe())
482+
} else {
483+
bail!("Event source not initialized");
484+
}
485+
}
486+
Err(e) => bail!("failed to acquire event source lock: {}", e),
476487
}
477488
}
478489

0 commit comments

Comments
 (0)