Skip to content

Commit

Permalink
feat: containerd support
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgiga1993 committed Mar 19, 2024
1 parent de1c7db commit dd99d41
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 17 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,13 @@ name: host
interface: eth0
networks: # List of networks to which to group the traffic by. The "name" will be used as label.
- name: "local"
network: "192.168.1.0/24"
cidrs: [ "192.168.1.0/24" ]
# All traffic not matching any network will be labeled with "other"
# Optional: If set to "k8s" the networks will be automatically filled
# based on the kubernetes containers running on this node.
# This will group the traffic by the k8s namespace.
mode: k8s
```

## DNS server statistics `Bind`
Expand Down
3 changes: 2 additions & 1 deletion pollect/core/Factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def __init__(self, base_name: str):
def create(self, class_name: str, *init_args):
class_obj = self._get_class_obj(class_name)
if class_obj is None:
raise AttributeError(f'Class {class_name} not found in module {self._base_module} - missing import?')
raise AttributeError(f'Class {class_name} not found in module {self._base_module} - missing import?'
f'Try running with --debug')
return class_obj(*init_args)

def _get_modules(self):
Expand Down
32 changes: 25 additions & 7 deletions pollect/sources/EbpfNetworkTrafficSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pollect.core.ValueSet import ValueSet, Value
from pollect.sources.Source import Source
from pollect.sources.helper.NetworkStats import NetworkMetricsCounter
from pollect.sources.helper.NetworkStats import NetworkMetricsCounter, ContainerNetworkUtils


class EbpfNetworkTrafficSource(Source):
Expand All @@ -23,12 +23,12 @@ def __init__(self, config):
self._running = False

self._interface = config['interface']
self._catch_all = NetworkMetricsCounter('other', '0.0.0.0/0')
self._k8s_mode = config['mode'] == 'k8s'
self._catch_all = NetworkMetricsCounter('other', ['0.0.0.0/0'])
self._networks: List[NetworkMetricsCounter] = []
for network in config.get('networks', []):
name = network['name']
net = network['network']
self._networks.append(NetworkMetricsCounter(name, net))
self._networks.append(NetworkMetricsCounter(name, network['cidrs']))

def setup(self, global_conf):
device = self._interface
Expand All @@ -38,10 +38,13 @@ def setup(self, global_conf):
if device_stats is None:
raise ValueError('Warning: Device ' + device + ' not found')

if device_stats.mtu >= self.MAX_MTU:
if device_stats.mtu > self.MAX_MTU:
self.log.warning(f'Device {device} MTU is too large: '
f'{device_stats.mtu}, must be <= {self.MAX_MTU}')

if self._k8s_mode:
self._update_networks()

self._running = True
threading.Thread(target=self._poll).start()

Expand All @@ -60,9 +63,14 @@ def _probe(self) -> Optional[ValueSet] or List[ValueSet]:
metrics = self._catch_all.get_per_second(current_time)
values.add(Value(label_values=[self._catch_all.name, 'to'], value=metrics.bytes_to_network))

if self._k8s_mode:
self._update_networks()
return values

def _poll(self):
"""
Polls the eBPF data
"""
src_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'bpf', 'core.c')

device = self._interface
Expand All @@ -73,8 +81,8 @@ def _poll(self):

def runs_on_every_ethernet_frame(_, data, size):
ip_and_bytes = b["events"].event(data)

for network in self._networks:
networks = self._networks
for network in networks:
if network.contains(ip_and_bytes.srcIp):
network.bytes_from_network += ip_and_bytes.bytes
return
Expand All @@ -90,3 +98,13 @@ def runs_on_every_ethernet_frame(_, data, size):
b.ring_buffer_poll()

b.remove_xdp(device, 0)

def _update_networks(self):
"""
Updates the networks list based on the containers running on this node
"""
namespaces = ContainerNetworkUtils.get_namespace_ips()
networks = []
for namespace, ips in namespaces.items():
networks.append(NetworkMetricsCounter(namespace, list(ips)))
self._networks = networks
85 changes: 80 additions & 5 deletions pollect/sources/helper/NetworkStats.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import datetime
import ipaddress
import json
import re
import subprocess
from typing import Dict, List


class NetworkMetrics:
Expand All @@ -15,23 +19,36 @@ def divide(self, delta):
self.bytes_from_network = self.bytes_from_network / delta


class Subnet:
def __init__(self, subnet: str):
subnet: ipaddress.IPv4Network = ipaddress.ip_network(subnet)
self._netw: int = int(subnet.network_address)
self._mask: int = int(subnet.netmask)

def contains(self, ip: int):
return (ip & self._mask) == self._netw


class NetworkMetricsCounter:
"""
Holds the count of bytes per IP subnet
"""

def __init__(self, name: str, subnet: str):
def __init__(self, name: str, subnets: List[str]):
self.name = name
self._subnet: ipaddress.IPv4Network = ipaddress.ip_network(subnet)
self._netw: int = int(self._subnet.network_address)
self._mask: int = int(self._subnet.netmask)
self._subnets: List[Subnet] = []
for subnet in subnets:
self._subnets.append(Subnet(subnet))

self.bytes_to_network: int = 0
self.bytes_from_network: int = 0
self.last_reset: datetime.datetime = datetime.datetime.now()

def contains(self, ip: int):
return (ip & self._mask) == self._netw
for net in self._subnets:
if net.contains(ip):
return True
return False

def get_per_second(self, now: datetime.datetime) -> NetworkMetrics:
delta = (now - self.last_reset).total_seconds()
Expand All @@ -43,3 +60,61 @@ def get_per_second(self, now: datetime.datetime) -> NetworkMetrics:
self.bytes_to_network = 0
self.bytes_from_network = 0
return metrics


class ContainerNetworkUtils:

@staticmethod
def get_namespace_ips():
"""
Returns a list of all k8s container IP addresses on the current node grouped by namespace.
This requires containerd as runtime.
"""
namespace_ips_map = {}

nic_ns = ContainerNetworkUtils.get_container_ips()
lines = subprocess.check_output(['ctr', '-n', 'k8s.io', 'containers', 'list', '-q']) \
.decode('utf-8').splitlines()

for container_id in lines:
# Find the network namespace of the container
json_str = subprocess.check_output(['ctr', '-n', 'k8s.io', 'container', 'info', container_id]) \
.decode('utf-8')
data = json.loads(json_str)
k8s_namespace = data.get('Labels', {}).get('io.kubernetes.pod.namespace', '')
network_namespace = None
for linux_ns in data.get('Spec', {}).get('linux', {}).get('namespaces', []):
if linux_ns.get('type') == 'network':
# Get last argument of path
network_namespace = linux_ns['path'].split('/')[-1]
break
if network_namespace is None:
# No network namespace found
continue

container_ip = nic_ns.get(network_namespace)
if container_ip is None:
# No IP found, maybe the container was created just in this moment?
continue
if k8s_namespace not in namespace_ips_map:
namespace_ips_map[k8s_namespace] = set()
namespace_ips_map[k8s_namespace].add(container_ip)
return namespace_ips_map

@staticmethod
def get_container_ips() -> Dict[str, str]:
"""
Returns a list of all k8s container IP addresses on the current node.
:return: Dict of networks in the CIDR notation mapped to their network namespace
"""
networks = {}
lines = subprocess.check_output(['ip', 'netns', 'list']).decode('utf-8').splitlines()
for line in lines:
namespace_name = line.split(' ', 2)[0]
ips = subprocess.check_output(['ip', 'netns', 'exec', namespace_name, 'ip', 'addr', 'show', 'eth0']) \
.decode('utf-8')

matches = re.findall(r'inet (.+?) ', ips)
if len(matches) > 0:
networks[namespace_name] = matches[0]
return networks
Loading

0 comments on commit dd99d41

Please sign in to comment.