@@ -51,7 +51,7 @@ use crate::Msg;
5151use crossbeam_channel:: { bounded, SendTimeoutError , Sender } ;
5252use std:: io;
5353use std:: io:: Write ;
54- use std:: sync:: atomic:: AtomicU64 ;
54+ use std:: sync:: atomic:: AtomicUsize ;
5555use std:: sync:: atomic:: Ordering ;
5656use std:: sync:: Arc ;
5757use std:: thread:: JoinHandle ;
@@ -124,11 +124,20 @@ pub struct WorkerGuard {
124124/// [fmt]: mod@tracing_subscriber::fmt
125125#[ derive( Clone , Debug ) ]
126126pub struct NonBlocking {
127- error_counter : Arc < AtomicU64 > ,
127+ error_counter : ErrorCounter ,
128128 channel : Sender < Msg > ,
129129 is_lossy : bool ,
130130}
131131
132+ /// Tracks the number of times a log line was dropped by the background thread.
133+ ///
134+ /// If the non-blocking writer is not configured in [lossy mode], the error
135+ /// count should always be 0.
136+ ///
137+ /// [lossy mode]: NonBlockingBuilder::lossy
138+ #[ derive( Clone , Debug ) ]
139+ pub struct ErrorCounter ( Arc < AtomicUsize > ) ;
140+
132141impl NonBlocking {
133142 /// Returns a new `NonBlocking` writer wrapping the provided `writer`.
134143 ///
@@ -157,7 +166,7 @@ impl NonBlocking {
157166 (
158167 Self {
159168 channel : sender,
160- error_counter : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
169+ error_counter : ErrorCounter ( Arc :: new ( AtomicUsize :: new ( 0 ) ) ) ,
161170 is_lossy,
162171 } ,
163172 worker_guard,
@@ -166,7 +175,7 @@ impl NonBlocking {
166175
167176 /// Returns a counter for the number of times logs where dropped. This will always return zero if
168177 /// `NonBlocking` is not lossy.
169- pub fn error_counter ( & self ) -> Arc < AtomicU64 > {
178+ pub fn error_counter ( & self ) -> ErrorCounter {
170179 self . error_counter . clone ( )
171180 }
172181}
@@ -218,7 +227,7 @@ impl std::io::Write for NonBlocking {
218227 let buf_size = buf. len ( ) ;
219228 if self . is_lossy {
220229 if self . channel . try_send ( Msg :: Line ( buf. to_vec ( ) ) ) . is_err ( ) {
221- self . error_counter . fetch_add ( 1 , Ordering :: Release ) ;
230+ self . error_counter . incr_saturating ( ) ;
222231 }
223232 } else {
224233 return match self . channel . send ( Msg :: Line ( buf. to_vec ( ) ) ) {
@@ -279,6 +288,43 @@ impl Drop for WorkerGuard {
279288 }
280289}
281290
291+ // === impl ErrorCounter ===
292+
293+ impl ErrorCounter {
294+ /// Returns the number of log lines that have been dropped.
295+ ///
296+ /// If the non-blocking writer is not configured in [lossy mode], the error
297+ /// count should always be 0.
298+ ///
299+ /// [lossy mode]: NonBlockingBuilder::lossy
300+ pub fn dropped_lines ( & self ) -> usize {
301+ self . 0 . load ( Ordering :: Acquire )
302+ }
303+
304+ fn incr_saturating ( & self ) {
305+ let mut curr = self . 0 . load ( Ordering :: Acquire ) ;
306+ // We don't need to enter the CAS loop if the current value is already
307+ // `usize::MAX`.
308+ if curr == usize:: MAX {
309+ return ;
310+ }
311+
312+ // This is implemented as a CAS loop rather than as a simple
313+ // `fetch_add`, because we don't want to wrap on overflow. Instead, we
314+ // need to ensure that saturating addition is performed.
315+ loop {
316+ let val = curr. saturating_add ( 1 ) ;
317+ match self
318+ . 0
319+ . compare_exchange ( curr, val, Ordering :: AcqRel , Ordering :: Acquire )
320+ {
321+ Ok ( _) => return ,
322+ Err ( actual) => curr = actual,
323+ }
324+ }
325+ }
326+ }
327+
282328#[ cfg( test) ]
283329mod test {
284330 use super :: * ;
@@ -321,7 +367,7 @@ mod test {
321367 let error_count = non_blocking. error_counter ( ) ;
322368
323369 non_blocking. write_all ( b"Hello" ) . expect ( "Failed to write" ) ;
324- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
370+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
325371
326372 let handle = thread:: spawn ( move || {
327373 non_blocking. write_all ( b", World" ) . expect ( "Failed to write" ) ;
@@ -330,7 +376,7 @@ mod test {
330376 // Sleep a little to ensure previously spawned thread gets blocked on write.
331377 thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
332378 // We should not drop logs when blocked.
333- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
379+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
334380
335381 // Read the first message to unblock sender.
336382 let mut line = rx. recv ( ) . unwrap ( ) ;
@@ -365,17 +411,17 @@ mod test {
365411
366412 // First write will not block
367413 write_non_blocking ( & mut non_blocking, b"Hello" ) ;
368- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
414+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
369415
370416 // Second write will not block as Worker will have called `recv` on channel.
371417 // "Hello" is not yet consumed. MockWriter call to write_all will block until
372418 // "Hello" is consumed.
373419 write_non_blocking ( & mut non_blocking, b", World" ) ;
374- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
420+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
375421
376422 // Will sit in NonBlocking channel's buffer.
377423 write_non_blocking ( & mut non_blocking, b"Test" ) ;
378- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
424+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
379425
380426 // Allow a line to be written. "Hello" message will be consumed.
381427 // ", World" will be able to write to MockWriter.
@@ -385,12 +431,12 @@ mod test {
385431
386432 // This will block as NonBlocking channel is full.
387433 write_non_blocking ( & mut non_blocking, b"Universe" ) ;
388- assert_eq ! ( 1 , error_count. load ( Ordering :: Acquire ) ) ;
434+ assert_eq ! ( 1 , error_count. dropped_lines ( ) ) ;
389435
390436 // Finally the second message sent will be consumed.
391437 let line = rx. recv ( ) . unwrap ( ) ;
392438 assert_eq ! ( line, ", World" ) ;
393- assert_eq ! ( 1 , error_count. load ( Ordering :: Acquire ) ) ;
439+ assert_eq ! ( 1 , error_count. dropped_lines ( ) ) ;
394440 }
395441
396442 #[ test]
@@ -426,6 +472,6 @@ mod test {
426472 }
427473
428474 assert_eq ! ( 10 , hello_count) ;
429- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
475+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
430476 }
431477}
0 commit comments