Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network flows metrics #586

Merged
merged 24 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
03b1d9e
rebased code from hackathon
mariomac Jan 26, 2024
3dde04f
Move network config
mariomac Jan 26, 2024
c2af319
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac Jan 30, 2024
72e28be
make tests compile and pass
mariomac Jan 30, 2024
2c789b0
fix linters
mariomac Jan 30, 2024
72fc8b7
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac Jan 30, 2024
27d099d
Basic working
mariomac Feb 1, 2024
2ce6baa
using cilium auto-generated structs
mariomac Feb 1, 2024
980dede
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac Feb 7, 2024
9c86918
fixing integration tests
mariomac Feb 7, 2024
b29a457
K8s decoration
mariomac Feb 7, 2024
2fa6dbe
basic entities and relationships
mariomac Feb 7, 2024
9074681
fixing namespace in services routes
mariomac Feb 7, 2024
68cbe4f
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac Feb 9, 2024
79775b0
fix linting and tests
mariomac Feb 12, 2024
1dd53db
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac Feb 12, 2024
769383b
Cleaning up ported code
mariomac Feb 12, 2024
6d44a2d
fix flaky unit test
mariomac Feb 12, 2024
c62cdfa
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac Feb 12, 2024
b5f5bbf
nonworking integration tests, probably due to limits in size of otel …
mariomac Feb 14, 2024
ede3d3c
fixing asserts test setup
mariomac Feb 14, 2024
9e5f0ae
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac Feb 15, 2024
871dabb
RC to merge into main
mariomac Feb 15, 2024
802927a
updated go.mod
mariomac Feb 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ The Initial Developer of some parts of the product, which are copied from, deriv
inspired by ebpf-go (https://github.com/cilium/ebpf).
Copyright Authors of Cilium.

The Initial Developer of some parts of the product, which are copied from, derived from, or
inspired by NetObserv eBPF Agent (https://github.com/netobserv/netobserv-ebpf-agent).
Copyright Red Hat/IBM.

The Initial Developer of some parts of the product, which are copied from, derived from, or
inspired by NetObserv Flowlogs-Pipeline (https://github.com/netobserv/flowlogs-pipeline).
Copyright Red Hat/IBM.

Grafana Beyla uses third-party libraries or other resources that may be
distributed under licenses different than the Grafana Beyla software. The licenses for
these third-party libraries are listed in the attached third_party_licenses.csv file
Expand Down
83 changes: 83 additions & 0 deletions bpf/flow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright Red Hat / IBM
// Copyright Grafana Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This implementation is a derivation of the code in
// https://github.com/netobserv/netobserv-ebpf-agent/tree/release-1.4

#ifndef __FLOW_H__
#define __FLOW_H__

#define TC_ACT_OK 0
#define TC_ACT_SHOT 2
#define IP_MAX_LEN 16

#define ETH_ALEN 6 /* Octets in one ethernet addr */

#define s6_addr in6_u.u6_addr8
#define ETH_P_IP 0x0800 /* Internet Protocol packet */
// ETH_P_IPV6 value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml
#define ETH_P_IPV6 0x86DD /* IPv6 over bluebook */
typedef __u8 u8;
typedef __u16 u16;
typedef __u32 u32;
typedef __u64 u64;

typedef struct flow_metrics_t {
u32 packets;
u64 bytes;
// start_mono_time_ts and end_mono_time_ts are the start and end times as system monotonic timestamps
// in nanoseconds, as output from bpf_ktime_get_ns() (kernel space)
// and monotime.Now() (user space)
u64 start_mono_time_ns;
u64 end_mono_time_ns;
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt
u16 flags;
// The positive errno of a failed map insertion that caused a flow
// to be sent via ringbuffer.
// 0 otherwise
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
} __attribute__((packed)) flow_metrics;

// Attributes that uniquely identify a flow
// TODO: remove attributes that won't be used in Beyla (e.g. MAC, maybe protocol...)
typedef struct flow_id_t {
u16 eth_protocol;
u8 direction;
// L2 data link layer
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
// L3 network layer
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
struct in6_addr src_ip;
struct in6_addr dst_ip;
// L4 transport layer
u16 src_port;
u16 dst_port;
u8 transport_protocol;
// OS interface index
u32 if_index;
} __attribute__((packed)) flow_id;

// Flow record is a tuple containing both flow identifier and metrics. It is used to send
// a complete flow via ring buffer when only when the accounting hashmap is full.
// Contents in this struct must match byte-by-byte with Go's pkc/flow/Record struct
typedef struct flow_record_t {
flow_id id;
flow_metrics metrics;
} __attribute__((packed)) flow_record;

#endif
287 changes: 287 additions & 0 deletions bpf/flows.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
// Copyright Red Hat / IBM
// Copyright Grafana Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This implementation is a derivation of the code in
// https://github.com/netobserv/netobserv-ebpf-agent/tree/release-1.4

#include "vmlinux.h"
#include <stdbool.h>

#include "bpf_helpers.h"
#include "bpf_endian.h"

#include "flow.h"

#define DISCARD 1
#define SUBMIT 0

// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
#define INGRESS 0
#define EGRESS 1

// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml
#define FIN_FLAG 0x01
#define SYN_FLAG 0x02
#define RST_FLAG 0x04
#define PSH_FLAG 0x08
#define ACK_FLAG 0x10
#define URG_FLAG 0x20
#define ECE_FLAG 0x40
#define CWR_FLAG 0x80
// Custom flags exported
#define SYN_ACK_FLAG 0x100
#define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400

// Common Ringbuffer as a conduit for ingress/egress flows to userspace
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} direct_flows SEC(".maps");

// Key: the flow identifier. Value: the flow metrics for that identifier.
// The userspace will aggregate them into a single flow.
struct {
__uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
} aggregated_flows SEC(".maps");

// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
volatile const u8 trace_messages = 0;

const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};

// sets the TCP header flags for connection information
static inline void set_flags(struct tcphdr *th, u16 *flags) {
//If both ACK and SYN are set, then it is server -> client communication during 3-way handshake.
if (th->ack && th->syn) {
*flags |= SYN_ACK_FLAG;
} else if (th->ack && th->fin ) {
// If both ACK and FIN are set, then it is graceful termination from server.
*flags |= FIN_ACK_FLAG;
} else if (th->ack && th->rst ) {
// If both ACK and RST are set, then it is abrupt connection termination.
*flags |= RST_ACK_FLAG;
} else if (th->fin) {
*flags |= FIN_FLAG;
} else if (th->syn) {
*flags |= SYN_FLAG;
} else if (th->rst) {
*flags |= RST_FLAG;
} else if (th->psh) {
*flags |= PSH_FLAG;
} else if (th->urg) {
*flags |= URG_FLAG;
} else if (th->ece) {
*flags |= ECE_FLAG;
} else if (th->cwr) {
*flags |= CWR_FLAG;
}
}
// sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) {
if ((void *)ip + sizeof(*ip) > data_end) {
return DISCARD;
}

__builtin_memcpy(id->src_ip.s6_addr, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->dst_ip.s6_addr, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip.s6_addr + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
__builtin_memcpy(id->dst_ip.s6_addr + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));
id->transport_protocol = ip->protocol;
id->src_port = 0;
id->dst_port = 0;
switch (ip->protocol) {
case IPPROTO_TCP: {
struct tcphdr *tcp = (void *)ip + sizeof(*ip);
if ((void *)tcp + sizeof(*tcp) <= data_end) {
id->src_port = __bpf_ntohs(tcp->source);
id->dst_port = __bpf_ntohs(tcp->dest);
set_flags(tcp, flags);
}
} break;
case IPPROTO_UDP: {
struct udphdr *udp = (void *)ip + sizeof(*ip);
if ((void *)udp + sizeof(*udp) <= data_end) {
id->src_port = __bpf_ntohs(udp->source);
id->dst_port = __bpf_ntohs(udp->dest);
}
} break;
default:
break;
}
return SUBMIT;
}

// sets flow fields from IPv6 header information
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) {
if ((void *)ip + sizeof(*ip) > data_end) {
return DISCARD;
}

id->src_ip = ip->saddr;
id->dst_ip = ip->daddr;
id->transport_protocol = ip->nexthdr;
id->src_port = 0;
id->dst_port = 0;
switch (ip->nexthdr) {
case IPPROTO_TCP: {
struct tcphdr *tcp = (void *)ip + sizeof(*ip);
if ((void *)tcp + sizeof(*tcp) <= data_end) {
id->src_port = __bpf_ntohs(tcp->source);
id->dst_port = __bpf_ntohs(tcp->dest);
set_flags(tcp, flags);
}
} break;
case IPPROTO_UDP: {
struct udphdr *udp = (void *)ip + sizeof(*ip);
if ((void *)udp + sizeof(*udp) <= data_end) {
id->src_port = __bpf_ntohs(udp->source);
id->dst_port = __bpf_ntohs(udp->dest);
}
} break;
default:
break;
}
return SUBMIT;
}
// sets flow fields from Ethernet header information
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u16 *flags) {
if ((void *)eth + sizeof(*eth) > data_end) {
return DISCARD;
}
__builtin_memcpy(id->dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(id->src_mac, eth->h_source, ETH_ALEN);
id->eth_protocol = __bpf_ntohs(eth->h_proto);

if (id->eth_protocol == ETH_P_IP) {
struct iphdr *ip = (void *)eth + sizeof(*eth);
return fill_iphdr(ip, data_end, id, flags);
} else if (id->eth_protocol == ETH_P_IPV6) {
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth);
return fill_ip6hdr(ip6, data_end, id, flags);
} else {
// TODO : Need to implement other specific ethertypes if needed
// For now other parts of flow id remain zero
__builtin_memset(&(id->src_ip), 0, sizeof(struct in6_addr));
__builtin_memset(&(id->dst_ip), 0, sizeof(struct in6_addr));
id->transport_protocol = 0;
id->src_port = 0;
id->dst_port = 0;
}
return SUBMIT;
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK;
}
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;

flow_id id;
__builtin_memset(&id, 0, sizeof(id));
u64 current_time = bpf_ktime_get_ns();
struct ethhdr *eth = data;
u16 flags = 0;
if (fill_ethhdr(eth, data_end, &id, &flags) == DISCARD) {
return TC_ACT_OK;
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = direction;

// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ns = current_time;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ns == 0) {
aggregate_flow->start_mono_time_ns = current_time;
}
aggregate_flow->flags |= flags;

long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
// usually error -16 (-EBUSY) is printed here.
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets),
// which can't be deduplicated.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
bpf_printk("error updating flow %d\n", ret);
}
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support this mode in the else statement at the moment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a ring buffer will force us to go with 5.8 as a minimum, we should check if it's acceptable. If we don't use this mode perhaps the safest is to remove it and keep our backwards compatibility as far as we can go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation. I will merge this and in another PR I'll consider using a perf buffer or just removing it, assuming that we might miss some packets.

.packets = 1,
.bytes = skb->len,
.start_mono_time_ns = current_time,
.end_mono_time_ns = current_time,
.flags = flags,
};

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (ret != 0) {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
bpf_printk("error adding flow %d\n", ret);
}

new_flow.errno = -ret;
flow_record *record = (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
}
return TC_ACT_OK;
}
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
}
return TC_ACT_OK;
}

SEC("tc_ingress")
int ingress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, INGRESS);
}

SEC("tc_egress")
int egress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb, EGRESS);
}

// Force emitting structs into the ELF for automatic creation of Golang struct
const flow_metrics *unused_flow_metrics __attribute__((unused));
const flow_id *unused_flow_id __attribute__((unused));
const flow_record *unused_flow_record __attribute__((unused));

char _license[] SEC("license") = "GPL";
Loading
Loading