Skip to content

Commit

Permalink
ref: Introduce a SessionFlusher (#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
Swatinem authored Oct 16, 2020
1 parent 47ae37e commit 6729dda
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 21 deletions.
20 changes: 17 additions & 3 deletions sentry-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use std::sync::RwLock;
use std::time::Duration;

use rand::random;
use sentry_types::protocol::v7::SessionUpdate;

use crate::constants::SDK_INFO;
use crate::protocol::{ClientSdkInfo, Event};
use crate::session::SessionFlusher;
use crate::types::{Dsn, Uuid};
use crate::{ClientOptions, Envelope, Hub, Integration, Scope, Transport};

Expand All @@ -19,6 +21,8 @@ impl<T: Into<ClientOptions>> From<T> for Client {
}
}

pub(crate) type TransportArc = Arc<RwLock<Option<Arc<dyn Transport>>>>;

/// The Sentry Client.
///
/// The Client is responsible for event processing and sending events to the
Expand All @@ -38,7 +42,8 @@ impl<T: Into<ClientOptions>> From<T> for Client {
/// [Unified API]: https://develop.sentry.dev/sdk/unified-api/
pub struct Client {
options: ClientOptions,
transport: RwLock<Option<Arc<dyn Transport>>>,
transport: TransportArc,
session_flusher: SessionFlusher,
integrations: Vec<(TypeId, Arc<dyn Integration>)>,
sdk_info: ClientSdkInfo,
}
Expand All @@ -54,9 +59,12 @@ impl fmt::Debug for Client {

impl Clone for Client {
fn clone(&self) -> Client {
let transport = Arc::new(RwLock::new(self.transport.read().unwrap().clone()));
let session_flusher = SessionFlusher::new(transport.clone());
Client {
options: self.options.clone(),
transport: RwLock::new(self.transport.read().unwrap().clone()),
transport,
session_flusher,
integrations: self.integrations.clone(),
sdk_info: self.sdk_info.clone(),
}
Expand Down Expand Up @@ -103,7 +111,7 @@ impl Client {
Some(factory.create_transport(&options))
};

let transport = RwLock::new(create_transport());
let transport = Arc::new(RwLock::new(create_transport()));

let mut sdk_info = SDK_INFO.clone();

Expand All @@ -120,9 +128,11 @@ impl Client {
sdk_info.integrations.push(integration.name().to_string());
}

let session_flusher = SessionFlusher::new(transport.clone());
Client {
options,
transport,
session_flusher,
integrations,
sdk_info,
}
Expand Down Expand Up @@ -265,6 +275,10 @@ impl Client {
Default::default()
}

pub(crate) fn enqueue_session(&self, session_update: SessionUpdate<'static>) {
self.session_flusher.enqueue(session_update)
}

pub(crate) fn capture_envelope(&self, envelope: Envelope) {
if let Some(ref transport) = *self.transport.read().unwrap() {
transport.send_envelope(envelope);
Expand Down
1 change: 1 addition & 0 deletions sentry-core/src/clientoptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl ClientOptions {
pub fn new() -> Self {
Self::default()
}

/// Adds a configured integration to the options.
///
/// # Examples
Expand Down
174 changes: 159 additions & 15 deletions sentry-core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
//!
//! https://develop.sentry.dev/sdk/sessions/
use std::sync::Arc;
use std::time::Instant;
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};

use crate::client::TransportArc;
use crate::protocol::{
EnvelopeItem, Event, Level, SessionAttributes, SessionStatus, SessionUpdate,
};
Expand All @@ -23,10 +25,8 @@ pub struct Session {
impl Drop for Session {
fn drop(&mut self) {
self.close();
if let Some(item) = self.create_envelope_item() {
let mut envelope = Envelope::new();
envelope.add_item(item);
self.client.capture_envelope(envelope);
if self.dirty {
self.client.enqueue_session(self.session_update.clone());
}
}
}
Expand Down Expand Up @@ -114,8 +114,130 @@ impl Session {
}
}

// as defined here: https://develop.sentry.dev/sdk/envelopes/#size-limits
const MAX_SESSION_ITEMS: usize = 100;
const FLUSH_INTERVAL: Duration = Duration::from_secs(60);

type SessionQueue = Arc<Mutex<Vec<SessionUpdate<'static>>>>;

/// Background Session Flusher
///
/// The background flusher queues session updates for delayed batched sending.
/// It has its own background thread that will flush its queue once every
/// `FLUSH_INTERVAL`.
///
/// For now it just batches all the session updates together into one envelope,
/// but in the future it will also pre-aggregate session numbers.
pub(crate) struct SessionFlusher {
transport: TransportArc,
queue: SessionQueue,
shutdown: Arc<(Mutex<bool>, Condvar)>,
worker: Option<JoinHandle<()>>,
}

impl SessionFlusher {
/// Creates a new Flusher that will submit envelopes to the given `transport`.
pub fn new(transport: TransportArc) -> Self {
let queue = Arc::new(Mutex::new(Vec::new()));
#[allow(clippy::mutex_atomic)]
let shutdown = Arc::new((Mutex::new(false), Condvar::new()));

let worker_transport = transport.clone();
let worker_queue = queue.clone();
let worker_shutdown = shutdown.clone();
let worker = std::thread::Builder::new()
.name("sentry-session-flusher".into())
.spawn(move || {
let (lock, cvar) = worker_shutdown.as_ref();
let mut shutdown = lock.lock().unwrap();
// check this immediately, in case the main thread is already shutting down
if *shutdown {
return;
}
let mut last_flush = Instant::now();
loop {
let timeout = FLUSH_INTERVAL - last_flush.elapsed();
shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0;
if *shutdown {
return;
}
if last_flush.elapsed() < FLUSH_INTERVAL {
continue;
}
SessionFlusher::flush(worker_queue.lock().unwrap(), &worker_transport);
last_flush = Instant::now();
}
})
.unwrap();

Self {
transport,
queue,
shutdown,
worker: Some(worker),
}
}

/// Enqueues a session update for delayed sending.
///
/// When the queue is full, it will be flushed immediately.
pub fn enqueue(&self, session_update: SessionUpdate<'static>) {
let mut queue = self.queue.lock().unwrap();
queue.push(session_update);
if queue.len() >= MAX_SESSION_ITEMS {
SessionFlusher::flush(queue, &self.transport);
}
}

/// Flushes the queue to the transport.
///
/// This is a static method as it will be called from both the background
/// thread and the main thread on drop.
fn flush(mut queue_lock: MutexGuard<Vec<SessionUpdate<'static>>>, transport: &TransportArc) {
let queue: Vec<_> = std::mem::take(queue_lock.as_mut());
drop(queue_lock);

if queue.is_empty() {
return;
}

let mut envelope = Envelope::new();
let mut items = 0;

for session_update in queue {
if items >= MAX_SESSION_ITEMS {
if let Some(ref transport) = *transport.read().unwrap() {
transport.send_envelope(envelope);
}
envelope = Envelope::new();
items = 0;
}
envelope.add_item(session_update);
items += 1;
}

if let Some(ref transport) = *transport.read().unwrap() {
transport.send_envelope(envelope);
}
}
}

impl Drop for SessionFlusher {
fn drop(&mut self) {
let (lock, cvar) = self.shutdown.as_ref();
*lock.lock().unwrap() = true;
cvar.notify_one();

if let Some(worker) = self.worker.take() {
worker.join().ok();
}
SessionFlusher::flush(self.queue.lock().unwrap(), &self.transport);
}
}

#[cfg(all(test, feature = "test"))]
mod tests {
use super::*;
use crate as sentry;
use crate::protocol::{Envelope, EnvelopeItem, SessionStatus};

Expand Down Expand Up @@ -153,6 +275,27 @@ mod tests {
assert_eq!(items.next(), None);
}

#[test]
fn test_session_batching() {
#![allow(clippy::match_like_matches_macro)]
let envelopes = capture_envelopes(|| {
for _ in 0..(MAX_SESSION_ITEMS * 2) {
sentry::start_session();
}
});
// we only want *two* envelope for all the sessions
assert_eq!(envelopes.len(), 2);

let items = envelopes[0].items().chain(envelopes[1].items());
assert_eq!(items.clone().count(), MAX_SESSION_ITEMS * 2);
for item in items {
assert!(match item {
EnvelopeItem::SessionUpdate(_) => true,
_ => false,
});
}
}

#[test]
fn test_session_error() {
let envelopes = capture_envelopes(|| {
Expand Down Expand Up @@ -298,23 +441,24 @@ mod tests {
}
assert_eq!(items.next(), None);

// the other two events should not have session updates
let mut items = envelopes[1].items();
if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
assert_eq!(session.status, SessionStatus::Exited);
assert_eq!(session.errors, 1);
assert_eq!(session.init, false);
} else {
panic!("expected session");
}
assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
assert_eq!(items.next(), None);

// the other two events should not have session updates
let mut items = envelopes[2].items();
assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
assert_eq!(items.next(), None);

// the session end is sent last as it is possibly batched
let mut items = envelopes[3].items();
assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
assert_eq!(session.status, SessionStatus::Exited);
assert_eq!(session.errors, 1);
assert_eq!(session.init, false);
} else {
panic!("expected session");
}
assert_eq!(items.next(), None);
}
}
1 change: 1 addition & 0 deletions sentry-types/src/protocol/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl From<SessionUpdate<'static>> for EnvelopeItem {
}

/// An Iterator over the items of an Envelope.
#[derive(Clone)]
pub struct EnvelopeItemIter<'s> {
inner: std::slice::Iter<'s, EnvelopeItem>,
}
Expand Down
8 changes: 5 additions & 3 deletions sentry/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::{Client, ClientOptions, Hub};

/// Helper struct that is returned from `init`.
///
/// When this is dropped events are drained with a 1 second timeout.
#[must_use = "when the init guard is dropped the transport will be shut down and no further \
events can be sent. If you do want to ignore this use mem::forget on it."]
/// When this is dropped events are drained with the configured `shutdown_timeout`.
#[must_use = "when the init guard is dropped the send queue is flushed and the \
transport will be shut down and no further events can be sent."]
pub struct ClientInitGuard(Arc<Client>);

impl std::ops::Deref for ClientInitGuard {
Expand Down Expand Up @@ -56,6 +56,8 @@ impl Drop for ClientInitGuard {
/// ```
///
/// Or if draining on shutdown should be ignored:
/// This is not recommended, as events or session updates that have been queued
/// might be lost.
///
/// ```
/// std::mem::forget(sentry::init("https://key@sentry.io/1234"));
Expand Down

0 comments on commit 6729dda

Please sign in to comment.