Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Performance improvements of ring buffer processing #1372

Merged
merged 1 commit into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions userspace/libscap/scap-int.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ typedef struct wh_t wh_t;
//
// Read buffer timeout constants
//
#define BUFFER_EMPTY_WAIT_TIME_MS 30
#define MAX_N_CONSECUTIVE_WAITS 4
#define BUFFER_EMPTY_WAIT_TIME_US_START 500
#define BUFFER_EMPTY_WAIT_TIME_US_MAX (30 * 1000)
#define BUFFER_EMPTY_THRESHOLD_B 20000

//
// Process flags
Expand Down Expand Up @@ -127,7 +128,7 @@ struct scap
scap_addrlist* m_addrlist;
scap_machine_info m_machine_info;
scap_userlist* m_userlist;
uint32_t m_n_consecutive_waits;
uint64_t m_buffer_empty_wait_time_us;
proc_entry_callback m_proc_callback;
void* m_proc_callback_context;
struct ppm_proclist_info* m_driver_procinfo;
Expand Down
121 changes: 66 additions & 55 deletions userspace/libscap/scap.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ scap_t* scap_open_live_int(char *error, int32_t *rc,
handle->m_num_suppressed_comms = 0;
handle->m_suppressed_tids = NULL;
handle->m_num_suppressed_evts = 0;
handle->m_buffer_empty_wait_time_us = BUFFER_EMPTY_WAIT_TIME_US_START;

if ((*rc = copy_comms(handle, suppressed_comms)) != SCAP_SUCCESS)
{
Expand Down Expand Up @@ -402,7 +403,6 @@ scap_t* scap_open_live_int(char *error, int32_t *rc,
//
handle->m_devs[j].m_lastreadsize = 0;
handle->m_devs[j].m_sn_len = 0;
handle->m_n_consecutive_waits = 0;
scap_stop_dropping_mode(handle);
}

Expand Down Expand Up @@ -897,17 +897,15 @@ void get_buf_pointers(struct ppm_ring_buffer_info* bufinfo, uint32_t* phead, uin
}
}

int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_t* len)
static void scap_advance_tail(scap_t* handle, uint32_t cpuid)
{
uint32_t ttail;

if(handle->m_bpf)
{
return scap_bpf_readbuf(handle, cpuid, buf, len);
return scap_bpf_advance_tail(handle, cpuid);
}

uint32_t thead;
uint32_t ttail;
uint64_t read_size;

//
// Update the tail based on the amount of data read in the *previous* call.
// Tail is never updated when we serve the data, because we assume that the caller is using
Expand All @@ -932,6 +930,20 @@ int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_
handle->m_devs[cpuid].m_bufinfo->tail = ttail - RING_BUF_SIZE;
}

handle->m_devs[cpuid].m_lastreadsize = 0;
}

int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_t* len)
{
uint32_t thead;
uint32_t ttail;
uint64_t read_size;

if(handle->m_bpf)
{
return scap_bpf_readbuf(handle, cpuid, buf, len);
}

//
// Read the pointers.
//
Expand All @@ -954,10 +966,9 @@ int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_
return SCAP_SUCCESS;
}

bool check_scap_next_wait(scap_t* handle)
static bool are_buffers_empty(scap_t* handle)
{
uint32_t j;
bool res = true;

for(j = 0; j < handle->m_ndevs; j++)
{
Expand All @@ -969,7 +980,6 @@ bool check_scap_next_wait(scap_t* handle)
uint64_t ttail;

scap_bpf_get_buf_pointers(handle->m_devs[j].m_buffer, &thead, &ttail, &read_size);

}
else
{
Expand All @@ -979,43 +989,35 @@ bool check_scap_next_wait(scap_t* handle)
get_buf_pointers(handle->m_devs[j].m_bufinfo, &thead, &ttail, &read_size);
}

if(read_size > 20000)
if(read_size > BUFFER_EMPTY_THRESHOLD_B)
{
handle->m_n_consecutive_waits = 0;
res = false;
return false;
}
}

if(res == false)
{
return false;
}

if(handle->m_n_consecutive_waits >= MAX_N_CONSECUTIVE_WAITS)
{
handle->m_n_consecutive_waits = 0;
return false;
}
else
{
return true;
}
return true;
}

int32_t refill_read_buffers(scap_t* handle)
{
uint32_t j;
uint32_t ndevs = handle->m_ndevs;

if(check_scap_next_wait(handle))
if(are_buffers_empty(handle))
{
usleep(BUFFER_EMPTY_WAIT_TIME_MS * 1000);
handle->m_n_consecutive_waits++;
usleep(handle->m_buffer_empty_wait_time_us);
handle->m_buffer_empty_wait_time_us = MIN(handle->m_buffer_empty_wait_time_us * 2,
BUFFER_EMPTY_WAIT_TIME_US_MAX);
}
else
{
handle->m_buffer_empty_wait_time_us = BUFFER_EMPTY_WAIT_TIME_US_START;
}

//
// Refill our data for each of the devices
//

for(j = 0; j < ndevs; j++)
{
struct scap_device *dev = &(handle->m_devs[j]);
Expand Down Expand Up @@ -1065,40 +1067,49 @@ static int32_t scap_next_live(scap_t* handle, OUT scap_evt** pevent, OUT uint16_
{
scap_device* dev = &(handle->m_devs[j]);

//
// Make sure that we have data from this ring
//
if(dev->m_sn_len != 0)
if(dev->m_sn_len == 0)
{
//
// We want to consume the event with the lowest timestamp
// If we don't have data from this ring, but we are
// still occupying, free the resources for the
// producer rather than sitting on them.
//
if(handle->m_bpf)
if(dev->m_lastreadsize > 0)
{
pe = scap_bpf_evt_from_perf_sample(dev->m_sn_next_event);
}
else
{
pe = (scap_evt *) dev->m_sn_next_event;
scap_advance_tail(handle, j);
}

if(pe->ts < max_ts)
{
if(pe->len > dev->m_sn_len)
{
snprintf(handle->m_lasterr, SCAP_LASTERR_SIZE, "scap_next buffer corruption");
continue;
}

//
// if you get the following assertion, first recompile the driver and libscap
//
ASSERT(false);
return SCAP_FAILURE;
}
if(handle->m_bpf)
{
pe = scap_bpf_evt_from_perf_sample(dev->m_sn_next_event);
}
else
{
pe = (scap_evt *) dev->m_sn_next_event;
}

//
// We want to consume the event with the lowest timestamp
//
if(pe->ts < max_ts)
{
if(pe->len > dev->m_sn_len)
{
snprintf(handle->m_lasterr, SCAP_LASTERR_SIZE, "scap_next buffer corruption");

*pevent = pe;
*pcpuid = j;
max_ts = pe->ts;
//
// if you get the following assertion, first recompile the driver and libscap
//
ASSERT(false);
return SCAP_FAILURE;
}

*pevent = pe;
*pcpuid = j;
max_ts = pe->ts;
}
}

Expand Down
23 changes: 20 additions & 3 deletions userspace/libscap/scap_bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ static inline void scap_bpf_get_buf_pointers(char *buf, uint64_t *phead, uint64_
*phead = header->data_head;
*ptail = header->data_tail;

// clang-format off
asm volatile("" ::: "memory");
// clang-format on

begin = *ptail % header->data_size;
end = *phead % header->data_size;
Expand Down Expand Up @@ -154,6 +156,23 @@ static inline int32_t scap_bpf_advance_to_evt(scap_t *handle, uint16_t cpuid, bo
return SCAP_SUCCESS;
}

static inline void scap_bpf_advance_tail(scap_t *handle, uint32_t cpuid)
{
struct perf_event_mmap_page *header;
struct scap_device *dev;

dev = &handle->m_devs[cpuid];
header = (struct perf_event_mmap_page *)dev->m_buffer;

// clang-format off
asm volatile("" ::: "memory");
// clang-format on

ASSERT(dev->m_lastreadsize > 0);
header->data_tail += dev->m_lastreadsize;
dev->m_lastreadsize = 0;
}

static inline int32_t scap_bpf_readbuf(scap_t *handle, uint32_t cpuid, char **buf, uint32_t *len)
{
struct perf_event_mmap_page *header;
Expand All @@ -166,9 +185,7 @@ static inline int32_t scap_bpf_readbuf(scap_t *handle, uint32_t cpuid, char **bu
dev = &handle->m_devs[cpuid];
header = (struct perf_event_mmap_page *) dev->m_buffer;

asm volatile("" ::: "memory");
header->data_tail += dev->m_lastreadsize;

ASSERT(dev->m_lastreadsize == 0);
scap_bpf_get_buf_pointers((char *) header, &head, &tail, &read_size);

dev->m_lastreadsize = read_size;
Expand Down