Skip to content

Commit

Permalink
Performance improvements of ring buffer processing
Browse files Browse the repository at this point in the history
Working on a feature that's particularly sensitive to event drops and
truncated snaplens, I did some performance experiments.

My chosen workload is a sustained busy loop that does nothing more than
writing 10KB at a time to /dev/null using `write()`. On my laptop, on a
single core I have a throughput of ~1.5M evt/s. Since I am explicitly
running with a very high snaplen (65k), this workload would generate a
sustained 7 GB/s (!) on a single core inside a ring buffer that could keep
up with the consumption. Since our buffer is 8MB, this means that userspace
has ~1ms to process all the data inside a ring buffer, before starting to
drop. This workload highlighted some potential low hanging fruits that I'll
describe here.

To start, this is how the whole thing behaves today, using eBPF (kmod is
similar, but with less overhead):

Total events (including dropped): 11.3M
Dropped events: 10.6M

Pretty bad, we dropped 95% of the events.

Using an off-time CPU analysis methodology this is what I get during a 10s
run:

$ sudo cpudist -O -m -P -p $(pgrep sysdig) 10 1
Tracing off-CPU time... Hit Ctrl-C to end.

pid = 27973 sysdig

     msecs               : count     distribution
         0 -> 1          : 16       |**                                      |
         2 -> 3          : 0        |                                        |
         4 -> 7          : 0        |                                        |
         8 -> 15         : 0        |                                        |
        16 -> 31         : 315      |****************************************|
        32 -> 63         : 1        |                                        |

$ sudo offcputime -p $(pgrep sysdig) 10
Tracing off-CPU time (us) of PID 31663 by user + kernel stack for 10 secs.

    finish_task_switch
    __schedule
    schedule
    exit_to_usermode_loop
    prepare_exit_to_usermode
    swapgs_restore_regs_and_return_to_usermode
    scap_next
    -                sysdig (31663)
        50

    finish_task_switch
    __schedule
    schedule
    do_nanosleep
    hrtimer_nanosleep
    sys_nanosleep
    do_syscall_64
    entry_SYSCALL_64_after_hwframe
    __GI___nanosleep
    -                sysdig (31663)
        9824891

So, while sysdig was dropping buffers like crazy, it was also leisurely
spending the vast majority of its time off CPU (315 * 30ms ~= 9.8s), as a
result of the 30ms sleep we use when we find the buffer empty. This happens
because the userspace consumer is not CPU-bound, so it's able to keep up
with the producer, and when it catches up it goes to sleep, and during the
pause time the producer sends 100s MB of data, overflowing the buffer.

The general approach here is to obviously enlarge the buffer and/or
diminish the sleeping time. Diminishing the sleeping time is tricky because
it increases CPU utilization, and increasing the ring buffer is tricky
because it's a lot of kernel memory wasted on multi core. To absorb a 30ms
pause at 7 GB/s the per-core buffer needs to be 200+MB large.

These are the things I experimented with:

- I replaced the fixed 30ms sleep with a simple adaptive algorithm: every
  time we find the buffer to be empty, we simply double the time we will
  sleep until we reach a ceiling (30ms). If the buffer is not empty, we
  reset the sleep threshold so we will start polling more aggressively next
  time (500us in this PR).

- I removed the logic of "max consecutive wait", since I couldn't find a
  sense to it: regardless of whether we wait or not, at the end of each
  period we always refill the buffer and make it available to the caller,
  so it's not that multiple consecutive waits can cause any harm (that I
  can see, at least), hence they don't need to be tracked.

- The tail of the buffer was previously advanced when the buffers are
  refilled, which for any buffer it could potentially be several us/ms
  after it is consumed since we have to wait for all the buffers to be
  emptied by userspace, thus keeping the ring buffer needlessly occupied.
  The tail is now advanced every time we are done completely consuming a
  buffer. The number of operations should be exactly the same. Admittedly
  this is a micro optimization, more for readability of the code, so that
  `check_scap_next_wait` always looks at the new amount of data available,
  rather than counting the already consumed one, which I found odd.

While this doesn't solve a bursty workload, it does wonders for a sustained
workload, as we can see here after the patch for the same stress test:

Total events (including dropped): 8M
Dropped events: 60k

Not bad at all for a 8MB ring buffer, < 1% dropped (the lower throughput is
because now the kernel probe is spending much more time copying all those
10KB into the ring buffer, since it's finding it free).

And the CPU of sysdig remains pretty much the same. In particular, during
idle time, the adaptive sleep will always quickly reach the ceiling and
stay there (see the relatively minor amount of sleeps under 30ms for the
10s period):

$ sudo cpudist -O -m -P -p $(pgrep sysdig) 10 1
Tracing off-CPU time... Hit Ctrl-C to end.

pid = 28645 sysdig

     msecs               : count     distribution
         0 -> 1          : 37       |****                                    |
         2 -> 3          : 11       |*                                       |
         4 -> 7          : 11       |*                                       |
         8 -> 15         : 11       |*                                       |
        16 -> 31         : 317      |****************************************|

When we are under sustained workload, the adaptive sleep will always
instead be at the floor, allowing us to be very responsive to the sustained
load:

$ sudo cpudist -O -m -P -p $(pgrep sysdig) 10 1
Tracing off-CPU time... Hit Ctrl-C to end.

pid = 28645 sysdig

     msecs               : count     distribution
         0 -> 1          : 12746    |****************************************|
         2 -> 3          : 0        |                                        |
         4 -> 7          : 1        |                                        |

This is rather simple but seems to be doing the job. Ideally I'd like to
replace it with a better feedback loop, where the current throughput of the
ring buffer is estimated (via timestamps and amounts read over a moving
average), and the polling interval is calculated based on that. This way,
it would be even more robust with bursty workloads.

Also, a reader might ask why we don't move to a proper I/O multiplexed
model using poll(). Unfortunately, it's not obvious that it would help
performance, in the past I've done experiments on this and the overhead
necessary on the kernel side to maintain the state and wake up the consumer
wasn't acceptable even with generous overflow thresholds when you talk
about millions of events per second. It's possible that the tests weren't
thorough, but that's certainly a much more significant work.
  • Loading branch information
gianlucaborello committed Apr 24, 2019
1 parent 199d5a6 commit 490b356
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 61 deletions.
7 changes: 4 additions & 3 deletions userspace/libscap/scap-int.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ typedef struct wh_t wh_t;
//
// Read buffer timeout constants
//
#define BUFFER_EMPTY_WAIT_TIME_MS 30
#define MAX_N_CONSECUTIVE_WAITS 4
#define BUFFER_EMPTY_WAIT_TIME_US_START 500
#define BUFFER_EMPTY_WAIT_TIME_US_MAX (30 * 1000)
#define BUFFER_EMPTY_THRESHOLD_B 20000

//
// Process flags
Expand Down Expand Up @@ -127,7 +128,7 @@ struct scap
scap_addrlist* m_addrlist;
scap_machine_info m_machine_info;
scap_userlist* m_userlist;
uint32_t m_n_consecutive_waits;
uint64_t m_buffer_empty_wait_time_us;
proc_entry_callback m_proc_callback;
void* m_proc_callback_context;
struct ppm_proclist_info* m_driver_procinfo;
Expand Down
121 changes: 66 additions & 55 deletions userspace/libscap/scap.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ scap_t* scap_open_live_int(char *error, int32_t *rc,
handle->m_num_suppressed_comms = 0;
handle->m_suppressed_tids = NULL;
handle->m_num_suppressed_evts = 0;
handle->m_buffer_empty_wait_time_us = BUFFER_EMPTY_WAIT_TIME_US_START;

if ((*rc = copy_comms(handle, suppressed_comms)) != SCAP_SUCCESS)
{
Expand Down Expand Up @@ -402,7 +403,6 @@ scap_t* scap_open_live_int(char *error, int32_t *rc,
//
handle->m_devs[j].m_lastreadsize = 0;
handle->m_devs[j].m_sn_len = 0;
handle->m_n_consecutive_waits = 0;
scap_stop_dropping_mode(handle);
}

Expand Down Expand Up @@ -897,17 +897,15 @@ void get_buf_pointers(struct ppm_ring_buffer_info* bufinfo, uint32_t* phead, uin
}
}

int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_t* len)
static void scap_advance_tail(scap_t* handle, uint32_t cpuid)
{
uint32_t ttail;

if(handle->m_bpf)
{
return scap_bpf_readbuf(handle, cpuid, buf, len);
return scap_bpf_advance_tail(handle, cpuid);
}

uint32_t thead;
uint32_t ttail;
uint64_t read_size;

//
// Update the tail based on the amount of data read in the *previous* call.
// Tail is never updated when we serve the data, because we assume that the caller is using
Expand All @@ -932,6 +930,20 @@ int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_
handle->m_devs[cpuid].m_bufinfo->tail = ttail - RING_BUF_SIZE;
}

handle->m_devs[cpuid].m_lastreadsize = 0;
}

int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_t* len)
{
uint32_t thead;
uint32_t ttail;
uint64_t read_size;

if(handle->m_bpf)
{
return scap_bpf_readbuf(handle, cpuid, buf, len);
}

//
// Read the pointers.
//
Expand All @@ -954,10 +966,9 @@ int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_
return SCAP_SUCCESS;
}

bool check_scap_next_wait(scap_t* handle)
static bool are_buffers_empty(scap_t* handle)
{
uint32_t j;
bool res = true;

for(j = 0; j < handle->m_ndevs; j++)
{
Expand All @@ -969,7 +980,6 @@ bool check_scap_next_wait(scap_t* handle)
uint64_t ttail;

scap_bpf_get_buf_pointers(handle->m_devs[j].m_buffer, &thead, &ttail, &read_size);

}
else
{
Expand All @@ -979,43 +989,35 @@ bool check_scap_next_wait(scap_t* handle)
get_buf_pointers(handle->m_devs[j].m_bufinfo, &thead, &ttail, &read_size);
}

if(read_size > 20000)
if(read_size > BUFFER_EMPTY_THRESHOLD_B)
{
handle->m_n_consecutive_waits = 0;
res = false;
return false;
}
}

if(res == false)
{
return false;
}

if(handle->m_n_consecutive_waits >= MAX_N_CONSECUTIVE_WAITS)
{
handle->m_n_consecutive_waits = 0;
return false;
}
else
{
return true;
}
return true;
}

int32_t refill_read_buffers(scap_t* handle)
{
uint32_t j;
uint32_t ndevs = handle->m_ndevs;

if(check_scap_next_wait(handle))
if(are_buffers_empty(handle))
{
usleep(BUFFER_EMPTY_WAIT_TIME_MS * 1000);
handle->m_n_consecutive_waits++;
usleep(handle->m_buffer_empty_wait_time_us);
handle->m_buffer_empty_wait_time_us = MIN(handle->m_buffer_empty_wait_time_us * 2,
BUFFER_EMPTY_WAIT_TIME_US_MAX);
}
else
{
handle->m_buffer_empty_wait_time_us = BUFFER_EMPTY_WAIT_TIME_US_START;
}

//
// Refill our data for each of the devices
//

for(j = 0; j < ndevs; j++)
{
struct scap_device *dev = &(handle->m_devs[j]);
Expand Down Expand Up @@ -1065,40 +1067,49 @@ static int32_t scap_next_live(scap_t* handle, OUT scap_evt** pevent, OUT uint16_
{
scap_device* dev = &(handle->m_devs[j]);

//
// Make sure that we have data from this ring
//
if(dev->m_sn_len != 0)
if(dev->m_sn_len == 0)
{
//
// We want to consume the event with the lowest timestamp
// If we don't have data from this ring, but we are
// still occupying, free the resources for the
// producer rather than sitting on them.
//
if(handle->m_bpf)
if(dev->m_lastreadsize > 0)
{
pe = scap_bpf_evt_from_perf_sample(dev->m_sn_next_event);
}
else
{
pe = (scap_evt *) dev->m_sn_next_event;
scap_advance_tail(handle, j);
}

if(pe->ts < max_ts)
{
if(pe->len > dev->m_sn_len)
{
snprintf(handle->m_lasterr, SCAP_LASTERR_SIZE, "scap_next buffer corruption");
continue;
}

//
// if you get the following assertion, first recompile the driver and libscap
//
ASSERT(false);
return SCAP_FAILURE;
}
if(handle->m_bpf)
{
pe = scap_bpf_evt_from_perf_sample(dev->m_sn_next_event);
}
else
{
pe = (scap_evt *) dev->m_sn_next_event;
}

//
// We want to consume the event with the lowest timestamp
//
if(pe->ts < max_ts)
{
if(pe->len > dev->m_sn_len)
{
snprintf(handle->m_lasterr, SCAP_LASTERR_SIZE, "scap_next buffer corruption");

*pevent = pe;
*pcpuid = j;
max_ts = pe->ts;
//
// if you get the following assertion, first recompile the driver and libscap
//
ASSERT(false);
return SCAP_FAILURE;
}

*pevent = pe;
*pcpuid = j;
max_ts = pe->ts;
}
}

Expand Down
23 changes: 20 additions & 3 deletions userspace/libscap/scap_bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ static inline void scap_bpf_get_buf_pointers(char *buf, uint64_t *phead, uint64_
*phead = header->data_head;
*ptail = header->data_tail;

// clang-format off
asm volatile("" ::: "memory");
// clang-format on

begin = *ptail % header->data_size;
end = *phead % header->data_size;
Expand Down Expand Up @@ -154,6 +156,23 @@ static inline int32_t scap_bpf_advance_to_evt(scap_t *handle, uint16_t cpuid, bo
return SCAP_SUCCESS;
}

static inline void scap_bpf_advance_tail(scap_t *handle, uint32_t cpuid)
{
struct perf_event_mmap_page *header;
struct scap_device *dev;

dev = &handle->m_devs[cpuid];
header = (struct perf_event_mmap_page *)dev->m_buffer;

// clang-format off
asm volatile("" ::: "memory");
// clang-format on

ASSERT(dev->m_lastreadsize > 0);
header->data_tail += dev->m_lastreadsize;
dev->m_lastreadsize = 0;
}

static inline int32_t scap_bpf_readbuf(scap_t *handle, uint32_t cpuid, char **buf, uint32_t *len)
{
struct perf_event_mmap_page *header;
Expand All @@ -166,9 +185,7 @@ static inline int32_t scap_bpf_readbuf(scap_t *handle, uint32_t cpuid, char **bu
dev = &handle->m_devs[cpuid];
header = (struct perf_event_mmap_page *) dev->m_buffer;

asm volatile("" ::: "memory");
header->data_tail += dev->m_lastreadsize;

ASSERT(dev->m_lastreadsize == 0);
scap_bpf_get_buf_pointers((char *) header, &head, &tail, &read_size);

dev->m_lastreadsize = read_size;
Expand Down

0 comments on commit 490b356

Please sign in to comment.