Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bd-buffer/src/buffer/aggregate_ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl RingBufferImpl {
allow_overwrite: AllowOverwrite,
volatile_stats: Arc<RingBufferStats>,
non_volatile_stats: Arc<RingBufferStats>,
on_evicted_cb: Option<Arc<dyn Fn(&[u8]) + Send + Sync>>,
) -> Result<Arc<Self>> {
// For aggregate buffers, the size of the file (after subtracting header space) must be >= the
// size of RAM. This is to avoid situations in which we accept a record into RAM but cannot ever
Expand Down Expand Up @@ -162,10 +163,11 @@ impl RingBufferImpl {
BlockWhenReservingIntoConcurrentRead::Yes,
per_record_crc32_check,
non_volatile_stats,
on_evicted_cb.clone(),
)?;

let volatile_buffer =
VolatileRingBuffer::new(format!("{name}-volatile"), volatile_size, volatile_stats);
VolatileRingBuffer::new(format!("{name}-volatile"), volatile_size, volatile_stats, on_evicted_cb);

let shared_data = Arc::new(SharedData {
volatile_buffer,
Expand Down
2 changes: 2 additions & 0 deletions bd-buffer/src/buffer/aggregate_ring_buffer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl Helper {
allow_overwrite,
Arc::new(RingBufferStats::default()),
stats.stats.clone(),
None,
)
.unwrap();
Self {
Expand Down Expand Up @@ -86,6 +87,7 @@ impl Helper {
self.allow_overwrite,
Arc::new(RingBufferStats::default()),
self.stats.stats.clone(),
None,
)?,
self.cursor,
));
Expand Down
24 changes: 24 additions & 0 deletions bd-buffer/src/buffer/common_ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ pub struct LockedData<ExtraLockedData> {
has_read_reservation_cb: Box<dyn Fn(&ExtraLockedData) -> bool + Send>,
has_write_reservation_cb: Box<dyn Fn(&ExtraLockedData) -> bool + Send>,

// Callback invoked when a record is evicted (overwritten).
// The argument is the record data.
on_evicted_cb: Box<dyn Fn(&[u8]) + Send + Sync>,

// Indicates whether the buffer is in a readable state or not, i.e. if we expect a read() call to
// immediately return an entry.
pub readable: tokio::sync::watch::Sender<bool>,
Expand Down Expand Up @@ -456,6 +460,20 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
// When overwriting we zero out any extra data, to make sure that CRCs, etc. become so the
// overwritten record is skipped correctly if corruption lands us in it somehow.
let next_read_start = guard.next_read_start().ok_or(InvariantError::Invariant)?;

// Invoke the eviction callback before zeroing/advancing.
// We need to calculate the data slice.
let data_start = (next_read_start + guard.extra_bytes_per_record) as usize;
let data_len = next_read_size as usize;
// Safety: We are holding the lock so reading this memory is safe.
// We just need to make sure we don't access out of bounds.
// The load_next_read_size checks ensures size is valid and within bounds.
let memory_slice = unsafe {
let ptr = guard.memory().as_ptr().add(data_start);
std::slice::from_raw_parts(ptr, data_len)
};
(guard.on_evicted_cb)(memory_slice);

guard.zero_extra_data(next_read_start);
guard.advance_next_read(next_read_actual_size, Cursor::No)?;
} else {
Expand Down Expand Up @@ -981,6 +999,7 @@ impl<ExtraLockedData> CommonRingBuffer<ExtraLockedData> {
on_total_data_loss_cb: impl Fn(&mut ExtraLockedData) + Send + 'static,
has_read_reservation_cb: impl Fn(&ExtraLockedData) -> bool + Send + 'static,
has_write_reservation_cb: impl Fn(&ExtraLockedData) -> bool + Send + 'static,
on_evicted_cb: Option<Arc<dyn Fn(&[u8]) + Send + Sync>>,
) -> Self {
// Initialize the channel to true on startup. If we end up reading from the buffer when it's
// not ready the async read calls will still wait for the read to be available, and starting
Expand All @@ -1005,6 +1024,11 @@ impl<ExtraLockedData> CommonRingBuffer<ExtraLockedData> {
on_total_data_loss_cb: Box::new(on_total_data_loss_cb),
has_read_reservation_cb: Box::new(has_read_reservation_cb),
has_write_reservation_cb: Box::new(has_write_reservation_cb),
on_evicted_cb: Box::new(move |data| {
if let Some(cb) = &on_evicted_cb {
cb(data);
}
}),
wait_for_drain_data: None,
pending_total_data_loss_reset: false,
readable,
Expand Down
83 changes: 79 additions & 4 deletions bd-buffer/src/buffer/common_ring_buffer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use bd_client_stats_store::Collector;
use bd_log_primitives::LossyIntToU32;
use parameterized::parameterized;
use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tempfile::TempDir;

#[derive(Clone, Copy)]
Expand Down Expand Up @@ -66,15 +66,27 @@ struct Helper {
helper: CommonHelper,
_temp_dir: TempDir,
stats: StatsTestHelper,
evicted_records: Arc<Mutex<Vec<Vec<u8>>>>,
}

impl Helper {
fn new(size: u32, test_type: TestType) -> Self {
let temp_dir = TempDir::with_prefix("buffer_test").unwrap();
let stats = StatsTestHelper::new(&Collector::default().scope(""));
let evicted_records = Arc::new(Mutex::new(Vec::new()));
let evicted_records_clone = evicted_records.clone();

let on_evicted_cb = Some(Arc::new(move |bytes: &[u8]| {
evicted_records_clone.lock().unwrap().push(bytes.to_vec());
}) as Arc<dyn Fn(&[u8]) + Send + Sync>);

let buffer = match test_type {
TestType::Volatile => VolatileRingBuffer::new("test".to_string(), size, stats.stats.clone())
as Arc<dyn RingBuffer>,
TestType::Volatile => VolatileRingBuffer::new(
"test".to_string(),
size,
stats.stats.clone(),
on_evicted_cb,
) as Arc<dyn RingBuffer>,
TestType::NonVolatile => NonVolatileRingBuffer::new(
"test".to_string(),
temp_dir.path().join("buffer"),
Expand All @@ -83,6 +95,7 @@ impl Helper {
BlockWhenReservingIntoConcurrentRead::No,
PerRecordCrc32Check::No,
stats.stats.clone(),
on_evicted_cb,
)
.unwrap() as Arc<dyn RingBuffer>,
TestType::Aggregate => AggregateRingBuffer::new(
Expand All @@ -94,13 +107,15 @@ impl Helper {
AllowOverwrite::Yes,
Arc::new(RingBufferStats::default()),
stats.stats.clone(),
on_evicted_cb,
)
.unwrap() as Arc<dyn RingBuffer>,
};
Self {
helper: CommonHelper::new(buffer, Cursor::No),
_temp_dir: temp_dir,
stats,
evicted_records,
}
}
}
Expand All @@ -112,7 +127,67 @@ impl Drop for Helper {
}
}

// Test basic error cases.

// Test that the eviction callback is fired when records are overwritten.
#[parameterized(test_type = {TestType::Volatile, TestType::NonVolatile, TestType::Aggregate})]
fn callback_fired_on_eviction(test_type: TestType) {
let mut root = Helper::new(30, test_type);
let helper = &mut root.helper;


// Reserve and write 0-9.
helper.reserve_and_commit("aaaaaa");

// Reserve and write 10-19.
helper.reserve_and_commit("bbbbbb");

// Reserve and write 20-29.
helper.reserve_and_commit("cccccc");

// Ensure these are propagated to NonVolatile in Aggregate case.
// This prevents the "slow flush" race where Volatile overwrites before flushing to NonVolatile.
if matches!(test_type, TestType::Aggregate) {
helper.buffer.flush();
}

root.stats.wait_for_total_records_written(3);

// Reserve and write 0-9. This should overwrite "aaaaaa".
helper.reserve_and_commit("dddddd");

if matches!(test_type, TestType::Aggregate) {
helper.buffer.flush();
}

root.stats.wait_for_total_records_written(4);


// Give a little time for callbacks to fire if there's any async behavior (though CommonRingBuffer calls it synchronously).
// However, for AggregateRingBuffer, the flush happens in a separate thread, so the eviction (and callback)
// happens on that thread.
// wait_for_total_records_written waits for the *write* to complete.
// If it's Aggregate, the write goes to Volatile. Then flush thread moves it to NonVolatile.
// The eviction happens when NonVolatile is written to.

// Let's loop briefly to wait for the callback.
let start = std::time::Instant::now();
loop {
let evicted = root.evicted_records.lock().unwrap();
if !evicted.is_empty() || start.elapsed() > std::time::Duration::from_secs(5) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}

let evicted = root.evicted_records.lock().unwrap();
assert_eq!(evicted.len(), 1, "Expected 1 evicted record");
let record = &evicted[0];
let as_string = String::from_utf8_lossy(record);
// We verify that the evicted record contains "aaaaaa".
// The exact format includes some header bytes, but the payload should be there.
assert!(as_string.contains("aaaaaa"), "Evicted record should contain 'aaaaaa', got: {:?}", as_string);
}

#[parameterized(test_type = {TestType::Volatile, TestType::NonVolatile, TestType::Aggregate})]
fn errors(test_type: TestType) {
let mut root = Helper::new(100, test_type);
Expand Down
2 changes: 2 additions & 0 deletions bd-buffer/src/buffer/non_volatile_ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ impl RingBufferImpl {
block_when_reserving_into_concurrent_read: BlockWhenReservingIntoConcurrentRead,
per_record_crc32_check: PerRecordCrc32Check,
stats: Arc<RingBufferStats>,
on_evicted_cb: Option<Arc<dyn Fn(&[u8]) + Send + Sync>>,
) -> Result<Arc<Self>> {
// The following static asserts verify that FileHeader is a known size with all field offsets
// known. This is done to avoid the use of #pragma pack(1) which may lead to poor performance on
Expand Down Expand Up @@ -896,6 +897,7 @@ impl RingBufferImpl {
.and_then(|p| p.reservation.as_ref())
.is_some()
},
on_evicted_cb,
),
}))
}
Expand Down
2 changes: 2 additions & 0 deletions bd-buffer/src/buffer/non_volatile_ring_buffer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Helper {
super::BlockWhenReservingIntoConcurrentRead::No,
super::PerRecordCrc32Check::Yes,
stats.stats.clone(),
None,
)
.unwrap();
Self {
Expand Down Expand Up @@ -82,6 +83,7 @@ impl Helper {
super::BlockWhenReservingIntoConcurrentRead::No,
super::PerRecordCrc32Check::Yes,
self.stats.stats.clone(),
None,
)?,
self.cursor,
));
Expand Down
8 changes: 7 additions & 1 deletion bd-buffer/src/buffer/volatile_ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,12 @@ pub struct RingBufferImpl {

impl RingBufferImpl {
#[must_use]
pub fn new(name: String, size: u32, stats: Arc<RingBufferStats>) -> Arc<Self> {
pub fn new(
name: String,
size: u32,
stats: Arc<RingBufferStats>,
on_evicted_cb: Option<Arc<dyn Fn(&[u8]) + Send + Sync>>,
) -> Arc<Self> {
let mut memory_do_not_use = Vec::with_capacity(size as usize);
memory_do_not_use.spare_capacity_mut(); // Appease clippy.
unsafe {
Expand Down Expand Up @@ -481,6 +486,7 @@ impl RingBufferImpl {
.is_some()
},
|extra_locked_data| !extra_locked_data.reservations.is_empty(),
on_evicted_cb,
);

Arc::new(Self {
Expand Down
2 changes: 1 addition & 1 deletion bd-buffer/src/buffer/volatile_ring_buffer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use itertools::Itertools;

fn make_helper(size: u32) -> Helper {
Helper::new(
RingBufferImpl::new("test".to_string(), size, RingBufferStats::default().into()),
RingBufferImpl::new("test".to_string(), size, RingBufferStats::default().into(), None),
Cursor::No,
)
}
Expand Down
12 changes: 12 additions & 0 deletions bd-buffer/src/ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,17 @@ pub struct Manager {

stream_buffer_size_flag:
bd_runtime::runtime::IntWatch<bd_runtime::runtime::buffers::StreamBufferSizeBytes>,

// Callback invoked when a record is evicted (overwritten).
on_evicted_cb: Option<Arc<dyn Fn(&[u8]) + Send + Sync>>,
}

impl Manager {
pub fn new(
buffer_directory: PathBuf,
stats: &Scope,
runtime: &bd_runtime::runtime::ConfigLoader,
on_evicted_cb: Option<Arc<dyn Fn(&[u8]) + Send + Sync>>,
) -> (
Arc<Self>,
tokio::sync::mpsc::Receiver<BufferEventWithResponse>,
Expand All @@ -184,6 +188,7 @@ impl Manager {
buffer_event_tx,
scope,
stream_buffer_size_flag: runtime.register_int_watch(),
on_evicted_cb,
}),
buffer_event_rx,
)
Expand Down Expand Up @@ -294,6 +299,7 @@ impl Manager {
.scope
.counter_with_labels("total_data_loss", labels! {"buffer_id" => &buffer.id}),
None,
self.on_evicted_cb.clone(),
)?;

updated_buffers.insert(buffer.id.clone(), (buffer_type, ring_buffer.clone()));
Expand Down Expand Up @@ -419,6 +425,7 @@ impl Manager {
"bd tail".to_string(),
*self.stream_buffer_size_flag.read(),
Arc::new(RingBufferStats::default()),
self.on_evicted_cb.clone(),
));

Some(BufferEventWithResponse::new(
Expand Down Expand Up @@ -565,6 +572,7 @@ impl RingBuffer {
volatile_records_written: Counter,
volatile_records_refused: Counter,
non_volatile_records_written: Option<Counter>,
on_evicted_cb: Option<Arc<dyn Fn(&[u8]) + Send + Sync>>,
) -> Result<Arc<AggregateRingBuffer>> {
// TODO(mattklein123): Right now we expose a very limited set of stats. Given it's much easier
// now to inject stats we can consider exposing the rest. For now just duplicate what we
Expand Down Expand Up @@ -595,6 +603,7 @@ impl RingBuffer {
},
Arc::new(volatile_stats),
Arc::new(non_volatile_stats),
on_evicted_cb,
)
}

Expand All @@ -612,6 +621,7 @@ impl RingBuffer {
corrupted_record_counter: Counter,
total_data_loss_counter: Counter,
non_volatile_records_written: Option<Counter>,
on_evicted_cb: Option<Arc<dyn Fn(&[u8]) + Send + Sync>>,
) -> Result<(Arc<Self>, bool)> {
let filename = non_volatile_filename
.to_str()
Expand All @@ -630,6 +640,7 @@ impl RingBuffer {
write_counter.clone(),
write_failure_counter.clone(),
non_volatile_records_written.clone(),
on_evicted_cb.clone(),
);

let mut deleted = false;
Expand Down Expand Up @@ -660,6 +671,7 @@ impl RingBuffer {
write_counter,
write_failure_counter,
non_volatile_records_written,
on_evicted_cb,
);
}

Expand Down
Loading
Loading