diff --git a/sentry/Cargo.toml b/sentry/Cargo.toml index 76f89ee1..c352cfc9 100644 --- a/sentry/Cargo.toml +++ b/sentry/Cargo.toml @@ -39,13 +39,13 @@ debug-logs = ["log_", "sentry-core/debug-logs"] # transports transport = ["reqwest", "native-tls"] reqwest = ["reqwest_", "httpdate", "tokio"] -curl = ["curl_", "httpdate", "tokio"] +curl = ["curl_", "httpdate"] surf-h1 = ["surf_/h1-client", "httpdate"] surf = ["surf_/curl-client", "httpdate", "tokio"] native-tls = ["reqwest_/default-tls"] rustls = ["reqwest_/rustls-tls"] -ureq = ["ureq_/tls", "httpdate", "tokio"] -ureq-native-tls = ["ureq_/native-tls", "httpdate", "tokio"] +ureq = ["ureq_/tls", "httpdate"] +ureq-native-tls = ["ureq_/native-tls", "httpdate"] [dependencies] sentry-core = { version = "0.24.2", path = "../sentry-core", features = ["client"] } diff --git a/sentry/src/transports/curl.rs b/sentry/src/transports/curl.rs index d0e05842..296ae8f5 100644 --- a/sentry/src/transports/curl.rs +++ b/sentry/src/transports/curl.rs @@ -39,7 +39,7 @@ impl CurlHttpTransport { let scheme = dsn.scheme(); let mut handle = client; - let thread = TransportThread::new(move |envelope, mut rl| { + let thread = TransportThread::new(move |envelope, rl| { handle.reset(); handle.url(&url).unwrap(); handle.custom_request("POST").unwrap(); @@ -119,7 +119,6 @@ impl CurlHttpTransport { sentry_debug!("Failed to send envelope: {}", err); } } - async move { rl } }); Self { thread } } diff --git a/sentry/src/transports/mod.rs b/sentry/src/transports/mod.rs index 053730a8..94f98df2 100644 --- a/sentry/src/transports/mod.rs +++ b/sentry/src/transports/mod.rs @@ -6,20 +6,12 @@ use crate::{ClientOptions, Transport, TransportFactory}; use std::sync::Arc; -#[cfg(any( - feature = "reqwest", - feature = "curl", - feature = "surf", - feature = "ureq" -))] +#[cfg(feature = "httpdate")] mod ratelimit; -#[cfg(any( - feature = "reqwest", - feature = "curl", - feature = "surf", - feature = "ureq" -))] +#[cfg(any(feature = "curl", feature = "ureq"))] mod thread; +#[cfg(any(feature = "reqwest", feature = "surf",))] +mod tokio_thread; #[cfg(feature = "reqwest")] mod reqwest; diff --git a/sentry/src/transports/reqwest.rs b/sentry/src/transports/reqwest.rs index d116512d..03e11438 100644 --- a/sentry/src/transports/reqwest.rs +++ b/sentry/src/transports/reqwest.rs @@ -2,7 +2,7 @@ use std::time::Duration; use reqwest_::{header as ReqwestHeaders, Client as ReqwestClient, Proxy, StatusCode}; -use super::thread::TransportThread; +use super::tokio_thread::TransportThread; use crate::{sentry_debug, ClientOptions, Envelope, Transport}; diff --git a/sentry/src/transports/surf.rs b/sentry/src/transports/surf.rs index b70558bf..d458a06c 100644 --- a/sentry/src/transports/surf.rs +++ b/sentry/src/transports/surf.rs @@ -2,7 +2,7 @@ use std::time::Duration; use surf_::{http::headers as SurfHeaders, Client as SurfClient, StatusCode}; -use super::thread::TransportThread; +use super::tokio_thread::TransportThread; use crate::{sentry_debug, ClientOptions, Envelope, Transport}; diff --git a/sentry/src/transports/thread.rs b/sentry/src/transports/thread.rs index 4a99a7a3..368ecdd8 100644 --- a/sentry/src/transports/thread.rs +++ b/sentry/src/transports/thread.rs @@ -20,11 +20,9 @@ pub struct TransportThread { } impl TransportThread { - pub fn new(mut send: SendFn) -> Self + pub fn new(mut send: SendFn) -> Self where - SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, - // NOTE: returning RateLimiter here, otherwise we are in borrow hell - SendFuture: std::future::Future, + SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static, { let (sender, receiver) = sync_channel(30); let shutdown = Arc::new(AtomicBool::new(false)); @@ -32,48 +30,39 @@ impl TransportThread { let handle = thread::Builder::new() .name("sentry-transport".into()) .spawn(move || { - // create a runtime on the transport thread - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let mut rl = RateLimiter::new(); - // and block on an async fn in this runtime/thread - rt.block_on(async move { - for task in receiver.into_iter() { - if shutdown_worker.load(Ordering::SeqCst) { + for task in receiver.into_iter() { + if shutdown_worker.load(Ordering::SeqCst) { + return; + } + let envelope = match task { + Task::SendEnvelope(envelope) => envelope, + Task::Flush(sender) => { + sender.send(()).ok(); + continue; + } + Task::Shutdown => { return; } - let envelope = match task { - Task::SendEnvelope(envelope) => envelope, - Task::Flush(sender) => { - sender.send(()).ok(); - continue; - } - Task::Shutdown => { - return; - } - }; + }; - if let Some(time_left) = rl.is_disabled(RateLimitingCategory::Any) { - sentry_debug!( - "Skipping event send because we're disabled due to rate limits for {}s", - time_left.as_secs() - ); - continue; - } - match rl.filter_envelope(envelope) { - Some(envelope) => { - rl = send(envelope, rl).await; - }, - None => { - sentry_debug!("Envelope was discarded due to per-item rate limits"); - }, - }; + if let Some(time_left) = rl.is_disabled(RateLimitingCategory::Any) { + sentry_debug!( + "Skipping event send because we're disabled due to rate limits for {}s", + time_left.as_secs() + ); + continue; } - }) + match rl.filter_envelope(envelope) { + Some(envelope) => { + send(envelope, &mut rl); + } + None => { + sentry_debug!("Envelope was discarded due to per-item rate limits"); + } + }; + } }) .ok(); diff --git a/sentry/src/transports/tokio_thread.rs b/sentry/src/transports/tokio_thread.rs new file mode 100644 index 00000000..4a99a7a3 --- /dev/null +++ b/sentry/src/transports/tokio_thread.rs @@ -0,0 +1,106 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{sync_channel, SyncSender}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +use super::ratelimit::{RateLimiter, RateLimitingCategory}; +use crate::{sentry_debug, Envelope}; + +enum Task { + SendEnvelope(Envelope), + Flush(SyncSender<()>), + Shutdown, +} + +pub struct TransportThread { + sender: SyncSender, + shutdown: Arc, + handle: Option>, +} + +impl TransportThread { + pub fn new(mut send: SendFn) -> Self + where + SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static, + // NOTE: returning RateLimiter here, otherwise we are in borrow hell + SendFuture: std::future::Future, + { + let (sender, receiver) = sync_channel(30); + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_worker = shutdown.clone(); + let handle = thread::Builder::new() + .name("sentry-transport".into()) + .spawn(move || { + // create a runtime on the transport thread + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let mut rl = RateLimiter::new(); + + // and block on an async fn in this runtime/thread + rt.block_on(async move { + for task in receiver.into_iter() { + if shutdown_worker.load(Ordering::SeqCst) { + return; + } + let envelope = match task { + Task::SendEnvelope(envelope) => envelope, + Task::Flush(sender) => { + sender.send(()).ok(); + continue; + } + Task::Shutdown => { + return; + } + }; + + if let Some(time_left) = rl.is_disabled(RateLimitingCategory::Any) { + sentry_debug!( + "Skipping event send because we're disabled due to rate limits for {}s", + time_left.as_secs() + ); + continue; + } + match rl.filter_envelope(envelope) { + Some(envelope) => { + rl = send(envelope, rl).await; + }, + None => { + sentry_debug!("Envelope was discarded due to per-item rate limits"); + }, + }; + } + }) + }) + .ok(); + + Self { + sender, + shutdown, + handle, + } + } + + pub fn send(&self, envelope: Envelope) { + let _ = self.sender.send(Task::SendEnvelope(envelope)); + } + + pub fn flush(&self, timeout: Duration) -> bool { + let (sender, receiver) = sync_channel(1); + let _ = self.sender.send(Task::Flush(sender)); + receiver.recv_timeout(timeout).is_err() + } +} + +impl Drop for TransportThread { + fn drop(&mut self) { + self.shutdown.store(true, Ordering::SeqCst); + let _ = self.sender.send(Task::Shutdown); + if let Some(handle) = self.handle.take() { + handle.join().unwrap(); + } + } +} diff --git a/sentry/src/transports/ureq.rs b/sentry/src/transports/ureq.rs index ccaab56e..0c3154b6 100644 --- a/sentry/src/transports/ureq.rs +++ b/sentry/src/transports/ureq.rs @@ -51,7 +51,7 @@ impl UreqHttpTransport { let auth = dsn.to_auth(Some(&user_agent)).to_string(); let url = dsn.envelope_api_url().to_string(); - let thread = TransportThread::new(move |envelope, mut rl| { + let thread = TransportThread::new(move |envelope, rl| { let mut body = Vec::new(); envelope.to_writer(&mut body).unwrap(); let request = agent @@ -82,7 +82,6 @@ impl UreqHttpTransport { sentry_debug!("Failed to send envelope: {}", err); } } - async move { rl } }); Self { thread } }