Skip to content

Commit

Permalink
Network flows metrics (#586)
Browse files Browse the repository at this point in the history
* rebased code from hackathon

* Move network config

* make tests compile and pass

* fix linters

* Basic working

* using cilium auto-generated structs

* fixing integration tests

* K8s decoration

* basic entities and relationships

* fixing namespace in services routes

* fix linting and tests

* Cleaning up ported code

* fix flaky unit test

* nonworking integration tests, probably due to limits in size of otel metrics export

* fixing asserts test setup

* RC to merge into main

* updated go.mod
  • Loading branch information
mariomac authored Feb 16, 2024
1 parent 6cb90b3 commit 9c103c2
Show file tree
Hide file tree
Showing 137 changed files with 24,093 additions and 34 deletions.
8 changes: 8 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ The Initial Developer of some parts of the product, which are copied from, deriv
inspired by ebpf-go (https://github.com/cilium/ebpf).
Copyright Authors of Cilium.

The Initial Developer of some parts of the product, which are copied from, derived from, or
inspired by NetObserv eBPF Agent (https://github.com/netobserv/netobserv-ebpf-agent).
Copyright Red Hat/IBM.

The Initial Developer of some parts of the product, which are copied from, derived from, or
inspired by NetObserv Flowlogs-Pipeline (https://github.com/netobserv/flowlogs-pipeline).
Copyright Red Hat/IBM.

Grafana Beyla uses third-party libraries or other resources that may be
distributed under licenses different than the Grafana Beyla software. The licenses for
these third-party libraries are listed in the attached third_party_licenses.csv file
Expand Down
83 changes: 83 additions & 0 deletions bpf/flow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright Red Hat / IBM
// Copyright Grafana Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This implementation is a derivation of the code in
// https://github.com/netobserv/netobserv-ebpf-agent/tree/release-1.4

#ifndef __FLOW_H__
#define __FLOW_H__

#define TC_ACT_OK 0
#define TC_ACT_SHOT 2
#define IP_MAX_LEN 16

#define ETH_ALEN 6 /* Octets in one ethernet addr */

#define s6_addr in6_u.u6_addr8
#define ETH_P_IP 0x0800 /* Internet Protocol packet */
// ETH_P_IPV6 value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml
#define ETH_P_IPV6 0x86DD /* IPv6 over bluebook */
typedef __u8 u8;
typedef __u16 u16;
typedef __u32 u32;
typedef __u64 u64;

typedef struct flow_metrics_t {
u32 packets;
u64 bytes;
// start_mono_time_ts and end_mono_time_ts are the start and end times as system monotonic timestamps
// in nanoseconds, as output from bpf_ktime_get_ns() (kernel space)
// and monotime.Now() (user space)
u64 start_mono_time_ns;
u64 end_mono_time_ns;
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt
u16 flags;
// The positive errno of a failed map insertion that caused a flow
// to be sent via ringbuffer.
// 0 otherwise
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
} __attribute__((packed)) flow_metrics;

// Attributes that uniquely identify a flow
// TODO: remove attributes that won't be used in Beyla (e.g. MAC, maybe protocol...)
typedef struct flow_id_t {
u16 eth_protocol;
u8 direction;
// L2 data link layer
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
// L3 network layer
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
struct in6_addr src_ip;
struct in6_addr dst_ip;
// L4 transport layer
u16 src_port;
u16 dst_port;
u8 transport_protocol;
// OS interface index
u32 if_index;
} __attribute__((packed)) flow_id;

// Flow record is a tuple containing both flow identifier and metrics. It is used to send
// a complete flow via ring buffer when only when the accounting hashmap is full.
// Contents in this struct must match byte-by-byte with Go's pkc/flow/Record struct
typedef struct flow_record_t {
flow_id id;
flow_metrics metrics;
} __attribute__((packed)) flow_record;

#endif
287 changes: 287 additions & 0 deletions bpf/flows.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
// Copyright Red Hat / IBM
// Copyright Grafana Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This implementation is a derivation of the code in
// https://github.com/netobserv/netobserv-ebpf-agent/tree/release-1.4

#include "vmlinux.h"
#include <stdbool.h>

#include "bpf_helpers.h"
#include "bpf_endian.h"

#include "flow.h"

#define DISCARD 1
#define SUBMIT 0

// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
#define INGRESS 0
#define EGRESS 1

// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml
#define FIN_FLAG 0x01
#define SYN_FLAG 0x02
#define RST_FLAG 0x04
#define PSH_FLAG 0x08
#define ACK_FLAG 0x10
#define URG_FLAG 0x20
#define ECE_FLAG 0x40
#define CWR_FLAG 0x80
// Custom flags exported
#define SYN_ACK_FLAG 0x100
#define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400

// Common Ringbuffer as a conduit for ingress/egress flows to userspace
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} direct_flows SEC(".maps");

// Key: the flow identifier. Value: the flow metrics for that identifier.
// The userspace will aggregate them into a single flow.
struct {
__uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
} aggregated_flows SEC(".maps");

// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
volatile const u8 trace_messages = 0;

const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};

// sets the TCP header flags for connection information
static inline void set_flags(struct tcphdr *th, u16 *flags) {
//If both ACK and SYN are set, then it is server -> client communication during 3-way handshake.
if (th->ack && th->syn) {
*flags |= SYN_ACK_FLAG;
} else if (th->ack && th->fin ) {
// If both ACK and FIN are set, then it is graceful termination from server.
*flags |= FIN_ACK_FLAG;
} else if (th->ack && th->rst ) {
// If both ACK and RST are set, then it is abrupt connection termination.
*flags |= RST_ACK_FLAG;
} else if (th->fin) {
*flags |= FIN_FLAG;
} else if (th->syn) {
*flags |= SYN_FLAG;
} else if (th->rst) {
*flags |= RST_FLAG;
} else if (th->psh) {
*flags |= PSH_FLAG;
} else if (th->urg) {
*flags |= URG_FLAG;
} else if (th->ece) {
*flags |= ECE_FLAG;
} else if (th->cwr) {
*flags |= CWR_FLAG;
}
}
// sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) {
if ((void *)ip + sizeof(*ip) > data_end) {
return DISCARD;
}

__builtin_memcpy(id->src_ip.s6_addr, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->dst_ip.s6_addr, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip.s6_addr + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
__builtin_memcpy(id->dst_ip.s6_addr + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));
id->transport_protocol = ip->protocol;
id->src_port = 0;
id->dst_port = 0;
switch (ip->protocol) {
case IPPROTO_TCP: {
struct tcphdr *tcp = (void *)ip + sizeof(*ip);
if ((void *)tcp + sizeof(*tcp) <= data_end) {
id->src_port = __bpf_ntohs(tcp->source);
id->dst_port = __bpf_ntohs(tcp->dest);
set_flags(tcp, flags);
}
} break;
case IPPROTO_UDP: {
struct udphdr *udp = (void *)ip + sizeof(*ip);
if ((void *)udp + sizeof(*udp) <= data_end) {
id->src_port = __bpf_ntohs(udp->source);
id->dst_port = __bpf_ntohs(udp->dest);
}
} break;
default:
break;
}
return SUBMIT;
}

// sets flow fields from IPv6 header information
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) {
if ((void *)ip + sizeof(*ip) > data_end) {
return DISCARD;
}

id->src_ip = ip->saddr;
id->dst_ip = ip->daddr;
id->transport_protocol = ip->nexthdr;
id->src_port = 0;
id->dst_port = 0;
switch (ip->nexthdr) {
case IPPROTO_TCP: {
struct tcphdr *tcp = (void *)ip + sizeof(*ip);
if ((void *)tcp + sizeof(*tcp) <= data_end) {
id->src_port = __bpf_ntohs(tcp->source);
id->dst_port = __bpf_ntohs(tcp->dest);
set_flags(tcp, flags);
}
} break;
case IPPROTO_UDP: {
struct udphdr *udp = (void *)ip + sizeof(*ip);
if ((void *)udp + sizeof(*udp) <= data_end) {
id->src_port = __bpf_ntohs(udp->source);
id->dst_port = __bpf_ntohs(udp->dest);
}
} break;
default:
break;
}
return SUBMIT;
}
// sets flow fields from Ethernet header information
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u16 *flags) {
if ((void *)eth + sizeof(*eth) > data_end) {
return DISCARD;
}
__builtin_memcpy(id->dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(id->src_mac, eth->h_source, ETH_ALEN);
id->eth_protocol = __bpf_ntohs(eth->h_proto);

if (id->eth_protocol == ETH_P_IP) {
struct iphdr *ip = (void *)eth + sizeof(*eth);
return fill_iphdr(ip, data_end, id, flags);
} else if (id->eth_protocol == ETH_P_IPV6) {
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth);
return fill_ip6hdr(ip6, data_end, id, flags);
} else {
// TODO : Need to implement other specific ethertypes if needed
// For now other parts of flow id remain zero
__builtin_memset(&(id->src_ip), 0, sizeof(struct in6_addr));
__builtin_memset(&(id->dst_ip), 0, sizeof(struct in6_addr));
id->transport_protocol = 0;
id->src_port = 0;
id->dst_port = 0;
}
return SUBMIT;
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK;
}
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;

flow_id id;
__builtin_memset(&id, 0, sizeof(id));
u64 current_time = bpf_ktime_get_ns();
struct ethhdr *eth = data;
u16 flags = 0;
if (fill_ethhdr(eth, data_end, &id, &flags) == DISCARD) {
return TC_ACT_OK;
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = direction;

// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ns = current_time;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ns == 0) {
aggregate_flow->start_mono_time_ns = current_time;
}
aggregate_flow->flags |= flags;

long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
// usually error -16 (-EBUSY) is printed here.
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets),
// which can't be deduplicated.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
bpf_printk("error updating flow %d\n", ret);
}
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
.packets = 1,
.bytes = skb->len,
.start_mono_time_ns = current_time,
.end_mono_time_ns = current_time,
.flags = flags,
};

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (ret != 0) {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
bpf_printk("error adding flow %d\n", ret);
}

new_flow.errno = -ret;
flow_record *record = (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
}
return TC_ACT_OK;
}
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
}
return TC_ACT_OK;
}

SEC("tc_ingress")
int ingress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, INGRESS);
}

SEC("tc_egress")
int egress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, EGRESS);
}

// Force emitting structs into the ELF for automatic creation of Golang struct
const flow_metrics *unused_flow_metrics __attribute__((unused));
const flow_id *unused_flow_id __attribute__((unused));
const flow_record *unused_flow_record __attribute__((unused));

char _license[] SEC("license") = "GPL";
Loading

0 comments on commit 9c103c2

Please sign in to comment.