-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Network flows metrics #586
Merged
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
03b1d9e
rebased code from hackathon
mariomac 3dde04f
Move network config
mariomac c2af319
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac 72e28be
make tests compile and pass
mariomac 2c789b0
fix linters
mariomac 72fc8b7
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac 27d099d
Basic working
mariomac 2ce6baa
using cilium auto-generated structs
mariomac 980dede
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac 9c86918
fixing integration tests
mariomac b29a457
K8s decoration
mariomac 2fa6dbe
basic entities and relationships
mariomac 9074681
fixing namespace in services routes
mariomac 68cbe4f
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac 79775b0
fix linting and tests
mariomac 1dd53db
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac 769383b
Cleaning up ported code
mariomac 6d44a2d
fix flaky unit test
mariomac c62cdfa
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac b5f5bbf
nonworking integration tests, probably due to limits in size of otel …
mariomac ede3d3c
fixing asserts test setup
mariomac 9e5f0ae
Merge branch 'main' of github.com:grafana/ebpf-autoinstrument into as…
mariomac 871dabb
RC to merge into main
mariomac 802927a
updated go.mod
mariomac File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
// Copyright Red Hat / IBM | ||
// Copyright Grafana Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// This implementation is a derivation of the code in | ||
// https://github.com/netobserv/netobserv-ebpf-agent/tree/release-1.4 | ||
|
||
#ifndef __FLOW_H__ | ||
#define __FLOW_H__ | ||
|
||
#define TC_ACT_OK 0 | ||
#define TC_ACT_SHOT 2 | ||
#define IP_MAX_LEN 16 | ||
|
||
#define ETH_ALEN 6 /* Octets in one ethernet addr */ | ||
|
||
#define s6_addr in6_u.u6_addr8 | ||
#define ETH_P_IP 0x0800 /* Internet Protocol packet */ | ||
// ETH_P_IPV6 value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml | ||
#define ETH_P_IPV6 0x86DD /* IPv6 over bluebook */ | ||
typedef __u8 u8; | ||
typedef __u16 u16; | ||
typedef __u32 u32; | ||
typedef __u64 u64; | ||
|
||
typedef struct flow_metrics_t { | ||
u32 packets; | ||
u64 bytes; | ||
// start_mono_time_ts and end_mono_time_ts are the start and end times as system monotonic timestamps | ||
// in nanoseconds, as output from bpf_ktime_get_ns() (kernel space) | ||
// and monotime.Now() (user space) | ||
u64 start_mono_time_ns; | ||
u64 end_mono_time_ns; | ||
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt | ||
u16 flags; | ||
// The positive errno of a failed map insertion that caused a flow | ||
// to be sent via ringbuffer. | ||
// 0 otherwise | ||
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md | ||
u8 errno; | ||
} __attribute__((packed)) flow_metrics; | ||
|
||
// Attributes that uniquely identify a flow | ||
// TODO: remove attributes that won't be used in Beyla (e.g. MAC, maybe protocol...) | ||
typedef struct flow_id_t { | ||
u16 eth_protocol; | ||
u8 direction; | ||
// L2 data link layer | ||
u8 src_mac[ETH_ALEN]; | ||
u8 dst_mac[ETH_ALEN]; | ||
// L3 network layer | ||
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96 | ||
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2 | ||
struct in6_addr src_ip; | ||
struct in6_addr dst_ip; | ||
// L4 transport layer | ||
u16 src_port; | ||
u16 dst_port; | ||
u8 transport_protocol; | ||
// OS interface index | ||
u32 if_index; | ||
} __attribute__((packed)) flow_id; | ||
|
||
// Flow record is a tuple containing both flow identifier and metrics. It is used to send | ||
// a complete flow via ring buffer when only when the accounting hashmap is full. | ||
// Contents in this struct must match byte-by-byte with Go's pkc/flow/Record struct | ||
typedef struct flow_record_t { | ||
flow_id id; | ||
flow_metrics metrics; | ||
} __attribute__((packed)) flow_record; | ||
|
||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,287 @@ | ||
// Copyright Red Hat / IBM | ||
// Copyright Grafana Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// This implementation is a derivation of the code in | ||
// https://github.com/netobserv/netobserv-ebpf-agent/tree/release-1.4 | ||
|
||
#include "vmlinux.h" | ||
#include <stdbool.h> | ||
|
||
#include "bpf_helpers.h" | ||
#include "bpf_endian.h" | ||
|
||
#include "flow.h" | ||
|
||
#define DISCARD 1 | ||
#define SUBMIT 0 | ||
|
||
// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml | ||
#define INGRESS 0 | ||
#define EGRESS 1 | ||
|
||
// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml | ||
#define FIN_FLAG 0x01 | ||
#define SYN_FLAG 0x02 | ||
#define RST_FLAG 0x04 | ||
#define PSH_FLAG 0x08 | ||
#define ACK_FLAG 0x10 | ||
#define URG_FLAG 0x20 | ||
#define ECE_FLAG 0x40 | ||
#define CWR_FLAG 0x80 | ||
// Custom flags exported | ||
#define SYN_ACK_FLAG 0x100 | ||
#define FIN_ACK_FLAG 0x200 | ||
#define RST_ACK_FLAG 0x400 | ||
|
||
// Common Ringbuffer as a conduit for ingress/egress flows to userspace | ||
struct { | ||
__uint(type, BPF_MAP_TYPE_RINGBUF); | ||
__uint(max_entries, 1 << 24); | ||
} direct_flows SEC(".maps"); | ||
|
||
// Key: the flow identifier. Value: the flow metrics for that identifier. | ||
// The userspace will aggregate them into a single flow. | ||
struct { | ||
__uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH); | ||
__type(key, flow_id); | ||
__type(value, flow_metrics); | ||
} aggregated_flows SEC(".maps"); | ||
|
||
// Constant definitions, to be overridden by the invoker | ||
volatile const u32 sampling = 0; | ||
volatile const u8 trace_messages = 0; | ||
|
||
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; | ||
|
||
// sets the TCP header flags for connection information | ||
static inline void set_flags(struct tcphdr *th, u16 *flags) { | ||
//If both ACK and SYN are set, then it is server -> client communication during 3-way handshake. | ||
if (th->ack && th->syn) { | ||
*flags |= SYN_ACK_FLAG; | ||
} else if (th->ack && th->fin ) { | ||
// If both ACK and FIN are set, then it is graceful termination from server. | ||
*flags |= FIN_ACK_FLAG; | ||
} else if (th->ack && th->rst ) { | ||
// If both ACK and RST are set, then it is abrupt connection termination. | ||
*flags |= RST_ACK_FLAG; | ||
} else if (th->fin) { | ||
*flags |= FIN_FLAG; | ||
} else if (th->syn) { | ||
*flags |= SYN_FLAG; | ||
} else if (th->rst) { | ||
*flags |= RST_FLAG; | ||
} else if (th->psh) { | ||
*flags |= PSH_FLAG; | ||
} else if (th->urg) { | ||
*flags |= URG_FLAG; | ||
} else if (th->ece) { | ||
*flags |= ECE_FLAG; | ||
} else if (th->cwr) { | ||
*flags |= CWR_FLAG; | ||
} | ||
} | ||
// sets flow fields from IPv4 header information | ||
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) { | ||
if ((void *)ip + sizeof(*ip) > data_end) { | ||
return DISCARD; | ||
} | ||
|
||
__builtin_memcpy(id->src_ip.s6_addr, ip4in6, sizeof(ip4in6)); | ||
__builtin_memcpy(id->dst_ip.s6_addr, ip4in6, sizeof(ip4in6)); | ||
__builtin_memcpy(id->src_ip.s6_addr + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr)); | ||
__builtin_memcpy(id->dst_ip.s6_addr + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr)); | ||
id->transport_protocol = ip->protocol; | ||
id->src_port = 0; | ||
id->dst_port = 0; | ||
switch (ip->protocol) { | ||
case IPPROTO_TCP: { | ||
struct tcphdr *tcp = (void *)ip + sizeof(*ip); | ||
if ((void *)tcp + sizeof(*tcp) <= data_end) { | ||
id->src_port = __bpf_ntohs(tcp->source); | ||
id->dst_port = __bpf_ntohs(tcp->dest); | ||
set_flags(tcp, flags); | ||
} | ||
} break; | ||
case IPPROTO_UDP: { | ||
struct udphdr *udp = (void *)ip + sizeof(*ip); | ||
if ((void *)udp + sizeof(*udp) <= data_end) { | ||
id->src_port = __bpf_ntohs(udp->source); | ||
id->dst_port = __bpf_ntohs(udp->dest); | ||
} | ||
} break; | ||
default: | ||
break; | ||
} | ||
return SUBMIT; | ||
} | ||
|
||
// sets flow fields from IPv6 header information | ||
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) { | ||
if ((void *)ip + sizeof(*ip) > data_end) { | ||
return DISCARD; | ||
} | ||
|
||
id->src_ip = ip->saddr; | ||
id->dst_ip = ip->daddr; | ||
id->transport_protocol = ip->nexthdr; | ||
id->src_port = 0; | ||
id->dst_port = 0; | ||
switch (ip->nexthdr) { | ||
case IPPROTO_TCP: { | ||
struct tcphdr *tcp = (void *)ip + sizeof(*ip); | ||
if ((void *)tcp + sizeof(*tcp) <= data_end) { | ||
id->src_port = __bpf_ntohs(tcp->source); | ||
id->dst_port = __bpf_ntohs(tcp->dest); | ||
set_flags(tcp, flags); | ||
} | ||
} break; | ||
case IPPROTO_UDP: { | ||
struct udphdr *udp = (void *)ip + sizeof(*ip); | ||
if ((void *)udp + sizeof(*udp) <= data_end) { | ||
id->src_port = __bpf_ntohs(udp->source); | ||
id->dst_port = __bpf_ntohs(udp->dest); | ||
} | ||
} break; | ||
default: | ||
break; | ||
} | ||
return SUBMIT; | ||
} | ||
// sets flow fields from Ethernet header information | ||
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u16 *flags) { | ||
if ((void *)eth + sizeof(*eth) > data_end) { | ||
return DISCARD; | ||
} | ||
__builtin_memcpy(id->dst_mac, eth->h_dest, ETH_ALEN); | ||
__builtin_memcpy(id->src_mac, eth->h_source, ETH_ALEN); | ||
id->eth_protocol = __bpf_ntohs(eth->h_proto); | ||
|
||
if (id->eth_protocol == ETH_P_IP) { | ||
struct iphdr *ip = (void *)eth + sizeof(*eth); | ||
return fill_iphdr(ip, data_end, id, flags); | ||
} else if (id->eth_protocol == ETH_P_IPV6) { | ||
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth); | ||
return fill_ip6hdr(ip6, data_end, id, flags); | ||
} else { | ||
// TODO : Need to implement other specific ethertypes if needed | ||
// For now other parts of flow id remain zero | ||
__builtin_memset(&(id->src_ip), 0, sizeof(struct in6_addr)); | ||
__builtin_memset(&(id->dst_ip), 0, sizeof(struct in6_addr)); | ||
id->transport_protocol = 0; | ||
id->src_port = 0; | ||
id->dst_port = 0; | ||
} | ||
return SUBMIT; | ||
} | ||
|
||
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { | ||
// If sampling is defined, will only parse 1 out of "sampling" flows | ||
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) { | ||
return TC_ACT_OK; | ||
} | ||
void *data_end = (void *)(long)skb->data_end; | ||
void *data = (void *)(long)skb->data; | ||
|
||
flow_id id; | ||
__builtin_memset(&id, 0, sizeof(id)); | ||
u64 current_time = bpf_ktime_get_ns(); | ||
struct ethhdr *eth = data; | ||
u16 flags = 0; | ||
if (fill_ethhdr(eth, data_end, &id, &flags) == DISCARD) { | ||
return TC_ACT_OK; | ||
} | ||
|
||
//Set extra fields | ||
id.if_index = skb->ifindex; | ||
id.direction = direction; | ||
|
||
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide | ||
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/ | ||
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); | ||
if (aggregate_flow != NULL) { | ||
aggregate_flow->packets += 1; | ||
aggregate_flow->bytes += skb->len; | ||
aggregate_flow->end_mono_time_ns = current_time; | ||
// it might happen that start_mono_time hasn't been set due to | ||
// the way percpu hashmap deal with concurrent map entries | ||
if (aggregate_flow->start_mono_time_ns == 0) { | ||
aggregate_flow->start_mono_time_ns = current_time; | ||
} | ||
aggregate_flow->flags |= flags; | ||
|
||
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); | ||
if (trace_messages && ret != 0) { | ||
// usually error -16 (-EBUSY) is printed here. | ||
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause | ||
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets), | ||
// which can't be deduplicated. | ||
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md | ||
bpf_printk("error updating flow %d\n", ret); | ||
} | ||
} else { | ||
// Key does not exist in the map, and will need to create a new entry. | ||
flow_metrics new_flow = { | ||
.packets = 1, | ||
.bytes = skb->len, | ||
.start_mono_time_ns = current_time, | ||
.end_mono_time_ns = current_time, | ||
.flags = flags, | ||
}; | ||
|
||
// even if we know that the entry is new, another CPU might be concurrently inserting a flow | ||
// so we need to specify BPF_ANY | ||
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); | ||
if (ret != 0) { | ||
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here. | ||
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have | ||
// a repeated INTERSECTION of flows (different flows aggregating different packets), | ||
// which can be re-aggregated at userpace. | ||
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md | ||
if (trace_messages) { | ||
bpf_printk("error adding flow %d\n", ret); | ||
} | ||
|
||
new_flow.errno = -ret; | ||
flow_record *record = (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0); | ||
if (!record) { | ||
if (trace_messages) { | ||
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow"); | ||
} | ||
return TC_ACT_OK; | ||
} | ||
record->id = id; | ||
record->metrics = new_flow; | ||
bpf_ringbuf_submit(record, 0); | ||
} | ||
} | ||
return TC_ACT_OK; | ||
} | ||
|
||
SEC("tc_ingress") | ||
int ingress_flow_parse(struct __sk_buff *skb) { | ||
return flow_monitor(skb, INGRESS); | ||
} | ||
|
||
SEC("tc_egress") | ||
int egress_flow_parse(struct __sk_buff *skb) { | ||
return flow_monitor(skb, EGRESS); | ||
} | ||
|
||
// Force emitting structs into the ELF for automatic creation of Golang struct | ||
const flow_metrics *unused_flow_metrics __attribute__((unused)); | ||
const flow_id *unused_flow_id __attribute__((unused)); | ||
const flow_record *unused_flow_record __attribute__((unused)); | ||
|
||
char _license[] SEC("license") = "GPL"; |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we support this mode in the else statement at the moment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a ring buffer will force us to go with 5.8 as a minimum, we should check if it's acceptable. If we don't use this mode perhaps the safest is to remove it and keep our backwards compatibility as far as we can go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good observation. I will merge this and in another PR I'll consider using a perf buffer or just removing it, assuming that we might miss some packets.