Skip to content

Commit a9bcf4f

Browse files
ishitatsuyukimstange
authored andcommitted
perf_event: Remove the size limit from EventIter.
To sort perf events on the fly, all available events from one stream must be examined at once, which is not possible with the current EventIter restricted to up to 31 event at once. Rewrite the out-of-order commit logic so that pending commits are tracked in a binary heap, which will remain small as long as events are dropped in-order but can also comfortably accommodate an unbounded amount of out-of-order drop. This changes the event iteration order. Previously, we iterated over 31 events from each buffer before going on to the next. Now, all available events in each buffer will be consumed before going on to the next. This incurs more buffering than before, but live event sorting will incur even more buffering, so seeing this as a transition step it should be acceptable.
1 parent 06c33c9 commit a9bcf4f

File tree

1 file changed

+58
-68
lines changed

1 file changed

+58
-68
lines changed

samply/src/linux/perf_event.rs

Lines changed: 58 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::cmp::max;
2-
use std::fmt;
2+
use std::collections::BinaryHeap;
33
use std::io;
44
use std::mem;
55
use std::ops::Range;
@@ -9,6 +9,7 @@ use std::slice;
99
use std::sync::atomic::fence;
1010
use std::sync::atomic::Ordering;
1111
use std::sync::Arc;
12+
use std::{cmp, fmt};
1213

1314
use libc::{self, c_void, pid_t};
1415
use linux_perf_data::linux_perf_event_reader;
@@ -69,6 +70,13 @@ unsafe fn read_head(pointer: *const u8) -> u64 {
6970
head
7071
}
7172

73+
unsafe fn read_tail(pointer: *const u8) -> u64 {
74+
let page = &*(pointer as *const PerfEventMmapPage);
75+
// No memory fence required because we're just reading a value previously
76+
// written by us.
77+
ptr::read_volatile(&page.data_tail)
78+
}
79+
7280
unsafe fn write_tail(pointer: *mut u8, value: u64) {
7381
let page = &mut *(pointer as *mut PerfEventMmapPage);
7482
fence(Ordering::AcqRel);
@@ -462,17 +470,36 @@ impl Perf {
462470
struct EventRefState {
463471
buffer: *mut u8,
464472
size: u64,
465-
done: u32,
466-
positions: [u64; 32],
473+
pending_commits: BinaryHeap<cmp::Reverse<(u64, u64)>>,
467474
}
468475

469476
impl EventRefState {
470477
fn new(buffer: *mut u8, size: u64) -> Self {
471478
EventRefState {
472479
buffer,
473480
size,
474-
done: !0,
475-
positions: [0; 32],
481+
pending_commits: BinaryHeap::new(),
482+
}
483+
}
484+
485+
/// Mark the read of [from, to) as complete.
486+
/// If reads are completed in-order, then this will advance the tail pointer to `to` immediately.
487+
/// Otherwise, it will remain in the "pending commit" queue, and committed once all previous
488+
/// reads are also committed.
489+
fn try_commit(&mut self, from: u64, to: u64) {
490+
self.pending_commits.push(cmp::Reverse((from, to)));
491+
492+
let mut position = unsafe { read_tail(self.buffer) };
493+
while let Some(&cmp::Reverse((from, to))) = self.pending_commits.peek() {
494+
if from == position {
495+
unsafe {
496+
write_tail(self.buffer, to);
497+
}
498+
position = to;
499+
self.pending_commits.pop();
500+
} else {
501+
break;
502+
}
476503
}
477504
}
478505
}
@@ -485,40 +512,40 @@ impl Drop for EventRefState {
485512
}
486513
}
487514

515+
/// Handle to a single event in the perf ring buffer.
516+
///
517+
/// On Drop, the event will be "consumed" and the read pointer will be advanced.
518+
///
519+
/// If events are dropped out of order, then it will be added to a list of pending commits and
520+
/// committed when all prior events are also dropped. For this reason, events should be dropped
521+
/// in-order to achieve the lowest overhead.
488522
#[derive(Clone)]
489523
pub struct EventRef {
490524
buffer: *mut u8,
491525
buffer_size: usize,
492-
event_location: RawRecordLocation,
493-
mask: u32,
494526
state: Arc<Mutex<EventRefState>>,
527+
event_location: RawRecordLocation,
528+
prev_position: u64,
529+
position: u64,
495530
parse_info: RecordParseInfo,
496531
}
497532

498533
impl fmt::Debug for EventRef {
499534
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
500535
fmt.debug_map()
501536
.entry(&"location", &self.event_location)
502-
.entry(&"mask", &format!("{:032b}", self.mask))
537+
.entry(&"prev_position", &self.prev_position)
538+
.entry(&"position", &self.position)
503539
.finish()
504540
}
505541
}
506542

507543
impl Drop for EventRef {
508544
#[inline]
509545
fn drop(&mut self) {
510-
let mut state = self.state.lock();
511-
let last_empty_spaces = state.done.leading_zeros();
512-
state.done &= self.mask;
513-
let empty_spaces = state.done.leading_zeros();
514-
515-
debug_assert!(empty_spaces >= last_empty_spaces);
516-
if empty_spaces != last_empty_spaces {
517-
let position = state.positions[empty_spaces as usize];
518-
unsafe {
519-
write_tail(self.buffer, position);
520-
}
521-
}
546+
self.state
547+
.lock()
548+
.try_commit(self.prev_position, self.position);
522549
}
523550
}
524551

@@ -532,45 +559,12 @@ impl EventRef {
532559

533560
pub struct EventIter<'a> {
534561
perf: &'a mut Perf,
535-
index: usize,
536-
locations: Vec<RawRecordLocation>,
537-
state: Arc<Mutex<EventRefState>>,
538562
}
539563

540564
impl<'a> EventIter<'a> {
541565
#[inline]
542566
fn new(perf: &'a mut Perf) -> Self {
543-
let mut locations = Vec::with_capacity(32);
544-
545-
{
546-
let state = Arc::get_mut(&mut perf.event_ref_state)
547-
.expect("Perf::iter called while the previous iterator hasn't finished processing");
548-
let state = state.get_mut();
549-
550-
for _ in 0..31 {
551-
state.positions[locations.len()] = perf.position;
552-
let raw_event_location =
553-
match next_raw_event(perf.buffer, perf.size, &mut perf.position) {
554-
Some(location) => location,
555-
None => break,
556-
};
557-
558-
locations.push(raw_event_location);
559-
}
560-
561-
state.positions[locations.len()] = perf.position;
562-
state.done = !0;
563-
}
564-
565-
// trace!("Batched {} events for PID {}", count, perf.pid);
566-
567-
let state = perf.event_ref_state.clone();
568-
EventIter {
569-
perf,
570-
index: 0,
571-
locations,
572-
state,
573-
}
567+
EventIter { perf }
574568
}
575569
}
576570

@@ -579,21 +573,17 @@ impl<'a> Iterator for EventIter<'a> {
579573

580574
#[inline]
581575
fn next(&mut self) -> Option<Self::Item> {
582-
if self.index == self.locations.len() {
583-
return None;
584-
}
585-
586-
let event_location = self.locations[self.index].clone();
587-
let event = EventRef {
588-
buffer: self.perf.buffer,
589-
buffer_size: self.perf.size as usize,
576+
let perf = &mut self.perf;
577+
let prev_position = perf.position;
578+
let event_location = next_raw_event(perf.buffer, perf.size, &mut perf.position)?;
579+
Some(EventRef {
580+
buffer: perf.buffer,
581+
buffer_size: perf.size as usize,
582+
state: perf.event_ref_state.clone(),
590583
event_location,
591-
mask: !(1 << (31 - self.index)),
592-
state: self.state.clone(),
584+
prev_position,
585+
position: perf.position,
593586
parse_info: self.perf.parse_info,
594-
};
595-
596-
self.index += 1;
597-
Some(event)
587+
})
598588
}
599589
}

0 commit comments

Comments
 (0)