Skip to content

Commit

Permalink
[networks] Increase HTTP path (#10418)
Browse files Browse the repository at this point in the history
* [networks] Increase HTTP path

* [networks] Cleanup python code

* Fix Kernel version conditional

Co-authored-by: Bryce Kahle <bryce.kahle@datadoghq.com>

* Refactor `http_already_seen`

* Revert unnecessary changes for arm64 build

* Run python linter

Co-authored-by: Bryce Kahle <bryce.kahle@datadoghq.com>
  • Loading branch information
p-lambert and brycekahle authored Jan 6, 2022
1 parent 511a257 commit 999cd5f
Show file tree
Hide file tree
Showing 19 changed files with 406 additions and 394 deletions.
2 changes: 1 addition & 1 deletion pkg/ebpf/bytecode/runtime/conntrack.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ebpf/bytecode/runtime/http.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ebpf/bytecode/runtime/oom-kill.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ebpf/bytecode/runtime/tcp-queue-length.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ebpf/bytecode/runtime/tracer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/ebpf/c/bpf_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ static int (*bpf_l4_csum_replace)(void* ctx, int off, int from, int to, int flag
*/
static int (*bpf_tail_call_compat)(void* ctx, void* map, int key) = (void*)BPF_FUNC_tail_call;

#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 6, 0)
static long (*bpf_skb_load_bytes)(const void *skb, u32 offset, void *to, u32 len) = (void*)BPF_FUNC_skb_load_bytes;
#endif

#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 8, 0)
static u64 (*bpf_get_current_task)(void) = (void*)BPF_FUNC_get_current_task;
static int (*bpf_probe_write_user)(void *dst, const void *src, int size) = (void *) BPF_FUNC_probe_write_user;
Expand Down
28 changes: 0 additions & 28 deletions pkg/network/ebpf/c/http-buffer.h

This file was deleted.

44 changes: 28 additions & 16 deletions pkg/network/ebpf/c/http-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
#include "tracer.h"

// This determines the size of the payload fragment that is captured for each HTTP request
#define HTTP_BUFFER_SIZE 25
#define HTTP_BUFFER_SIZE 80
// This controls the number of HTTP transactions read from userspace at a time
#define HTTP_BATCH_SIZE 15
// The greater this number is the less likely are colisions/data-races between the flushes
#define HTTP_BATCH_PAGES 10
#define HTTP_BATCH_PAGES 15

// HTTP/1.1 XXX
// _________^
#define HTTP_STATUS_OFFSET 9


typedef enum
{
Expand All @@ -29,20 +34,6 @@ typedef enum
HTTP_PATCH
} http_method_t;

typedef struct {
// idx is a monotonic counter used for uniquely determinng a batch within a CPU core
// this is useful for detecting race conditions that result in a batch being overrriden
// before it gets consumed from userspace
__u64 idx;
// pos indicates the batch slot where the next http transaction should be written to
__u8 pos;
// idx_to_notify is used to track which batch completions were notified to userspace
// * if idx_to_notify == idx, the current index is still being appended to;
// * if idx_to_notify < idx, the batch at idx_to_notify needs to be sent to userspace;
// (note that idx will never be less than idx_to_notify);
__u64 idx_to_notify;
} http_batch_state_t;

// This struct is used in the map lookup that returns the active batch for a certain CPU core
typedef struct {
__u32 cpu;
Expand All @@ -64,9 +55,30 @@ typedef struct {
// be populated with the "original" (pre-normalization) source port number of
// the TCP segment containing the beginning of a given HTTP request
__u16 owned_by_src_port;

// this field is used to disambiguate segments in the context of keep-alives
// we populate it with the TCP seq number of the request and then the response segments
__u32 tcp_seq;

__u64 tags;
} http_transaction_t;

typedef struct {
http_transaction_t scratch_tx;

// idx is a monotonic counter used for uniquely determinng a batch within a CPU core
// this is useful for detecting race conditions that result in a batch being overrriden
// before it gets consumed from userspace
__u64 idx;
// pos indicates the batch slot where the next http transaction should be written to
__u8 pos;
// idx_to_notify is used to track which batch completions were notified to userspace
// * if idx_to_notify == idx, the current index is still being appended to;
// * if idx_to_notify < idx, the batch at idx_to_notify needs to be sent to userspace;
// (note that idx will never be less than idx_to_notify);
__u64 idx_to_notify;
} http_batch_state_t;

typedef struct {
__u64 idx;
__u8 pos;
Expand Down
139 changes: 76 additions & 63 deletions pkg/network/ebpf/c/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static __always_inline int http_responding(http_transaction_t *http) {
return (http != NULL && http->response_status_code != 0);
}

static __always_inline void http_enqueue(http_transaction_t *http, conn_tuple_t *tup) {
static __always_inline void http_enqueue(http_transaction_t *http) {
// Retrieve the active batch number for this CPU
u32 cpu = bpf_get_smp_processor_id();
http_batch_state_t *batch_state = bpf_map_lookup_elem(&http_batch_state, &cpu);
Expand All @@ -58,9 +58,6 @@ static __always_inline void http_enqueue(http_transaction_t *http, conn_tuple_t
return;
}

// Embed tuple information in the http_transaction_t object before enqueueing it
__builtin_memcpy(&http->tup, tup, sizeof(conn_tuple_t));

// I haven't found a way to avoid this unrolled loop on Kernel 4.4 (newer versions work fine)
// If you try to directly write the desired batch slot by doing
//
Expand Down Expand Up @@ -102,42 +99,20 @@ static __always_inline void http_enqueue(http_transaction_t *http, conn_tuple_t
}
}

static __always_inline int http_begin_request(http_transaction_t *http, http_method_t method, char *buffer, conn_tuple_t *tup) {
// This can happen in the context of HTTP keep-alives;
if (http_responding(http)) {
http_enqueue(http, tup);
}

static __always_inline void http_begin_request(http_transaction_t *http, http_method_t method, char *buffer) {
http->request_method = method;
http->request_started = bpf_ktime_get_ns();
http->response_last_seen = 0;
http->response_status_code = 0;
__builtin_memcpy(&http->request_fragment, buffer, HTTP_BUFFER_SIZE);
return 1;
}

static __always_inline int http_begin_response(http_transaction_t *http, const char *buffer) {
// Extract the status code from the response fragment
// HTTP/1.1 200 OK
// _________^^^___
// Code below is a bit oddly structured in order to make kernel 4.4 verifier happy
__u16 status_code = 0;
__u8 space_found = 0;
#pragma unroll
for (int i = 0; i < HTTP_BUFFER_SIZE - 1; i++) {
if (!space_found && buffer[i] == ' ') {
space_found = 1;
} else if (space_found && status_code < 100) {
status_code = status_code * 10 + (buffer[i] - '0');
}
}

if (status_code < 100 || status_code >= 600) {
return 0;
}

static __always_inline void http_begin_response(http_transaction_t *http, const char *buffer) {
u16 status_code = 0;
status_code += (buffer[HTTP_STATUS_OFFSET+0]-'0') * 100;
status_code += (buffer[HTTP_STATUS_OFFSET+1]-'0') * 10;
status_code += (buffer[HTTP_STATUS_OFFSET+2]-'0') * 1;
http->response_status_code = status_code;
return 1;
}

static __always_inline void http_parse_data(char *p, http_packet_t *packet_type, http_method_t *method) {
Expand Down Expand Up @@ -167,51 +142,89 @@ static __always_inline void http_parse_data(char *p, http_packet_t *packet_type,
}
}

static __always_inline int http_process(char *buffer, skb_info_t *skb_info, u16 src_port, u64 tags) {
static __always_inline http_transaction_t *http_fetch_state(http_transaction_t *http, skb_info_t *skb_info, http_packet_t packet_type) {
if (packet_type == HTTP_PACKET_UNKNOWN) {
return bpf_map_lookup_elem(&http_in_flight, &http->tup);
}

// We detected either a request or a response
// In this case we initialize (or fetch) state associated to this tuple
bpf_map_update_elem(&http_in_flight, &http->tup, http, BPF_NOEXIST);
http_transaction_t *http_ebpf = bpf_map_lookup_elem(&http_in_flight, &http->tup);
if (http_ebpf == NULL || skb_info == NULL) {
return http_ebpf;
}

// Bail out if we've seen this TCP segment before
// This can happen in the context of localhost traffic where the same TCP segment
// can be seen multiple times coming in and out from different interfaces
if (http_ebpf->tcp_seq == skb_info->tcp_seq) {
return NULL;
}

http_ebpf->tcp_seq = skb_info->tcp_seq;
return http_ebpf;
}

static __always_inline http_transaction_t* http_should_flush_previous_state(http_transaction_t *http, http_packet_t packet_type) {
// this can happen in the context of keep-alives
bool must_flush = (packet_type == HTTP_REQUEST && http->request_started) ||
(packet_type == HTTP_RESPONSE && http->response_status_code);

if (!must_flush) {
return NULL;
}

u32 cpu = bpf_get_smp_processor_id();
http_batch_state_t *batch_state = bpf_map_lookup_elem(&http_batch_state, &cpu);
if (batch_state == NULL) {
return NULL;
}

__builtin_memcpy(&batch_state->scratch_tx, http, sizeof(http_transaction_t));
return &batch_state->scratch_tx;
}

static __always_inline bool http_closed(http_transaction_t *http, skb_info_t *skb_info, u16 pre_norm_src_port) {
return (skb_info && skb_info->tcp_flags&TCPHDR_FIN &&
http->owned_by_src_port == pre_norm_src_port);
}

static __always_inline int http_process(http_transaction_t *http_stack, skb_info_t *skb_info) {
char *buffer = (char *)http_stack->request_fragment;
http_packet_t packet_type = HTTP_PACKET_UNKNOWN;
http_method_t method = HTTP_METHOD_UNKNOWN;
http_parse_data(buffer, &packet_type, &method);
http_transaction_t *http = NULL;

http_transaction_t new_entry = { 0 };
new_entry.owned_by_src_port = src_port;
http_transaction_t *http = http_fetch_state(http_stack, skb_info, packet_type);
if (http == NULL) {
return 0;
}

switch(packet_type) {
case HTTP_REQUEST:
bpf_map_update_elem(&http_in_flight, &skb_info->tup, &new_entry, BPF_NOEXIST);
http = bpf_map_lookup_elem(&http_in_flight, &skb_info->tup);
if (http == NULL || http->owned_by_src_port != src_port) {
return 0;
}
http_begin_request(http, method, buffer, &skb_info->tup);
break;
case HTTP_RESPONSE:
bpf_map_update_elem(&http_in_flight, &skb_info->tup, &new_entry, BPF_NOEXIST);
http = bpf_map_lookup_elem(&http_in_flight, &skb_info->tup);
if (http == NULL) {
return 0;
}
http_transaction_t *to_flush = http_should_flush_previous_state(http, packet_type);
if (packet_type == HTTP_REQUEST) {
http_begin_request(http, method, buffer);
} else if (packet_type == HTTP_RESPONSE) {
http_begin_response(http, buffer);
break;
default:
// We're either in the middle of either a request or response
http = bpf_map_lookup_elem(&http_in_flight, &skb_info->tup);
if (http == NULL) {
return 0;
}
}

http->tags |= tags;

// If we have a (L7/application-layer) payload we want to update the response_last_seen
// This is to prevent things such as a keep-alive adding up to the transaction latency
if (buffer[0] != 0) {
http->response_last_seen = bpf_ktime_get_ns();
}

if (skb_info->tcp_flags & TCPHDR_FIN && http->owned_by_src_port == src_port) {
http_enqueue(http, &skb_info->tup);
bpf_map_delete_elem(&http_in_flight, &skb_info->tup);
bool conn_closed = http_closed(http, skb_info, http_stack->owned_by_src_port);
if (conn_closed) {
to_flush = http;
}

if (to_flush) {
http_enqueue(to_flush);
}

if (conn_closed) {
bpf_map_delete_elem(&http_in_flight, &http_stack->tup);
}

return 0;
Expand Down
Loading

0 comments on commit 999cd5f

Please sign in to comment.