Skip to content

Commit

Permalink
refactor: Remove tokio dependency for blocking ureq and curl transpor…
Browse files Browse the repository at this point in the history
…ts (#422)

These transport backends don't need the overhead of a tokio runtime and
all the dependency crates that come with it, when they are already
blocking on their own.  A simple Rust thread and mpsc channel is enough.
The transport thread implementation is duplicated between a tokio and
non-tokio module that are used by reqwest/surf, and curl/ureq
respectively.

See also
#419 (comment)
where this was briefly discussed.
  • Loading branch information
MarijnS95 authored Jan 31, 2022
1 parent 34f6c0a commit 9a41d6c
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 61 deletions.
6 changes: 3 additions & 3 deletions sentry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ debug-logs = ["log_", "sentry-core/debug-logs"]
# transports
transport = ["reqwest", "native-tls"]
reqwest = ["reqwest_", "httpdate", "tokio"]
curl = ["curl_", "httpdate", "tokio"]
curl = ["curl_", "httpdate"]
surf-h1 = ["surf_/h1-client", "httpdate"]
surf = ["surf_/curl-client", "httpdate", "tokio"]
native-tls = ["reqwest_/default-tls"]
rustls = ["reqwest_/rustls-tls"]
ureq = ["ureq_/tls", "httpdate", "tokio"]
ureq-native-tls = ["ureq_/native-tls", "httpdate", "tokio"]
ureq = ["ureq_/tls", "httpdate"]
ureq-native-tls = ["ureq_/native-tls", "httpdate"]

[dependencies]
sentry-core = { version = "0.24.2", path = "../sentry-core", features = ["client"] }
Expand Down
3 changes: 1 addition & 2 deletions sentry/src/transports/curl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl CurlHttpTransport {
let scheme = dsn.scheme();

let mut handle = client;
let thread = TransportThread::new(move |envelope, mut rl| {
let thread = TransportThread::new(move |envelope, rl| {
handle.reset();
handle.url(&url).unwrap();
handle.custom_request("POST").unwrap();
Expand Down Expand Up @@ -119,7 +119,6 @@ impl CurlHttpTransport {
sentry_debug!("Failed to send envelope: {}", err);
}
}
async move { rl }
});
Self { thread }
}
Expand Down
16 changes: 4 additions & 12 deletions sentry/src/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,12 @@
use crate::{ClientOptions, Transport, TransportFactory};
use std::sync::Arc;

#[cfg(any(
feature = "reqwest",
feature = "curl",
feature = "surf",
feature = "ureq"
))]
#[cfg(feature = "httpdate")]
mod ratelimit;
#[cfg(any(
feature = "reqwest",
feature = "curl",
feature = "surf",
feature = "ureq"
))]
#[cfg(any(feature = "curl", feature = "ureq"))]
mod thread;
#[cfg(any(feature = "reqwest", feature = "surf",))]
mod tokio_thread;

#[cfg(feature = "reqwest")]
mod reqwest;
Expand Down
2 changes: 1 addition & 1 deletion sentry/src/transports/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use reqwest_::{header as ReqwestHeaders, Client as ReqwestClient, Proxy, StatusCode};

use super::thread::TransportThread;
use super::tokio_thread::TransportThread;

use crate::{sentry_debug, ClientOptions, Envelope, Transport};

Expand Down
2 changes: 1 addition & 1 deletion sentry/src/transports/surf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use surf_::{http::headers as SurfHeaders, Client as SurfClient, StatusCode};

use super::thread::TransportThread;
use super::tokio_thread::TransportThread;

use crate::{sentry_debug, ClientOptions, Envelope, Transport};

Expand Down
69 changes: 29 additions & 40 deletions sentry/src/transports/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,49 @@ pub struct TransportThread {
}

impl TransportThread {
pub fn new<SendFn, SendFuture>(mut send: SendFn) -> Self
pub fn new<SendFn>(mut send: SendFn) -> Self
where
SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static,
// NOTE: returning RateLimiter here, otherwise we are in borrow hell
SendFuture: std::future::Future<Output = RateLimiter>,
SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static,
{
let (sender, receiver) = sync_channel(30);
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_worker = shutdown.clone();
let handle = thread::Builder::new()
.name("sentry-transport".into())
.spawn(move || {
// create a runtime on the transport thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let mut rl = RateLimiter::new();

// and block on an async fn in this runtime/thread
rt.block_on(async move {
for task in receiver.into_iter() {
if shutdown_worker.load(Ordering::SeqCst) {
for task in receiver.into_iter() {
if shutdown_worker.load(Ordering::SeqCst) {
return;
}
let envelope = match task {
Task::SendEnvelope(envelope) => envelope,
Task::Flush(sender) => {
sender.send(()).ok();
continue;
}
Task::Shutdown => {
return;
}
let envelope = match task {
Task::SendEnvelope(envelope) => envelope,
Task::Flush(sender) => {
sender.send(()).ok();
continue;
}
Task::Shutdown => {
return;
}
};
};

if let Some(time_left) = rl.is_disabled(RateLimitingCategory::Any) {
sentry_debug!(
"Skipping event send because we're disabled due to rate limits for {}s",
time_left.as_secs()
);
continue;
}
match rl.filter_envelope(envelope) {
Some(envelope) => {
rl = send(envelope, rl).await;
},
None => {
sentry_debug!("Envelope was discarded due to per-item rate limits");
},
};
if let Some(time_left) = rl.is_disabled(RateLimitingCategory::Any) {
sentry_debug!(
"Skipping event send because we're disabled due to rate limits for {}s",
time_left.as_secs()
);
continue;
}
})
match rl.filter_envelope(envelope) {
Some(envelope) => {
send(envelope, &mut rl);
}
None => {
sentry_debug!("Envelope was discarded due to per-item rate limits");
}
};
}
})
.ok();

Expand Down
106 changes: 106 additions & 0 deletions sentry/src/transports/tokio_thread.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{sync_channel, SyncSender};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;

use super::ratelimit::{RateLimiter, RateLimitingCategory};
use crate::{sentry_debug, Envelope};

enum Task {
SendEnvelope(Envelope),
Flush(SyncSender<()>),
Shutdown,
}

pub struct TransportThread {
sender: SyncSender<Task>,
shutdown: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}

impl TransportThread {
pub fn new<SendFn, SendFuture>(mut send: SendFn) -> Self
where
SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static,
// NOTE: returning RateLimiter here, otherwise we are in borrow hell
SendFuture: std::future::Future<Output = RateLimiter>,
{
let (sender, receiver) = sync_channel(30);
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_worker = shutdown.clone();
let handle = thread::Builder::new()
.name("sentry-transport".into())
.spawn(move || {
// create a runtime on the transport thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let mut rl = RateLimiter::new();

// and block on an async fn in this runtime/thread
rt.block_on(async move {
for task in receiver.into_iter() {
if shutdown_worker.load(Ordering::SeqCst) {
return;
}
let envelope = match task {
Task::SendEnvelope(envelope) => envelope,
Task::Flush(sender) => {
sender.send(()).ok();
continue;
}
Task::Shutdown => {
return;
}
};

if let Some(time_left) = rl.is_disabled(RateLimitingCategory::Any) {
sentry_debug!(
"Skipping event send because we're disabled due to rate limits for {}s",
time_left.as_secs()
);
continue;
}
match rl.filter_envelope(envelope) {
Some(envelope) => {
rl = send(envelope, rl).await;
},
None => {
sentry_debug!("Envelope was discarded due to per-item rate limits");
},
};
}
})
})
.ok();

Self {
sender,
shutdown,
handle,
}
}

pub fn send(&self, envelope: Envelope) {
let _ = self.sender.send(Task::SendEnvelope(envelope));
}

pub fn flush(&self, timeout: Duration) -> bool {
let (sender, receiver) = sync_channel(1);
let _ = self.sender.send(Task::Flush(sender));
receiver.recv_timeout(timeout).is_err()
}
}

impl Drop for TransportThread {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
let _ = self.sender.send(Task::Shutdown);
if let Some(handle) = self.handle.take() {
handle.join().unwrap();
}
}
}
3 changes: 1 addition & 2 deletions sentry/src/transports/ureq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl UreqHttpTransport {
let auth = dsn.to_auth(Some(&user_agent)).to_string();
let url = dsn.envelope_api_url().to_string();

let thread = TransportThread::new(move |envelope, mut rl| {
let thread = TransportThread::new(move |envelope, rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = agent
Expand Down Expand Up @@ -82,7 +82,6 @@ impl UreqHttpTransport {
sentry_debug!("Failed to send envelope: {}", err);
}
}
async move { rl }
});
Self { thread }
}
Expand Down

0 comments on commit 9a41d6c

Please sign in to comment.