Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fc 2531 traffic events #79

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Generated by Django 3.2.23 on 2024-01-31 18:41

import django.db.models.manager
from django.db import migrations


class Migration(migrations.Migration):
dependencies = [
("django_devicectl", "0029_devicerefereereport"),
]

operations = [
migrations.CreateModel(
name="AuditCtlIXTrafficProcessed",
fields=[],
options={
"proxy": True,
"indexes": [],
"constraints": [],
},
bases=("django_fullctl.task",),
managers=[
("handleref", django.db.models.manager.Manager()),
],
),
migrations.CreateModel(
name="AuditCtlIXTrafficReceived",
fields=[],
options={
"proxy": True,
"indexes": [],
"constraints": [],
},
bases=("django_fullctl.task",),
managers=[
("handleref", django.db.models.manager.Manager()),
],
),
]
170 changes: 169 additions & 1 deletion src/django_devicectl/models/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import os

import fullctl.django.tasks.qualifiers as qualifiers
import fullctl.graph.mrtg.rrd as mrtg_rrd
import fullctl.service_bridge.auditctl as auditctl
import fullctl.service_bridge.ixctl as ixctl
import fullctl.service_bridge.nautobot as nautobot
import fullctl.service_bridge.peerctl as peerctl
Expand Down Expand Up @@ -164,6 +166,10 @@ def run(self, *args, **kwargs):
*processed_port_ids,
)
else:
AuditCtlIXTrafficReceived.create_task(
org=self.org,
data=kwargs.get("update", []),
)
UpdateIxctlIxTrafficGraphs.create_task(
org=self.org,
)
Expand Down Expand Up @@ -328,7 +334,6 @@ def aggregate_ix_graphs(self):
graph_files = []

for ix_id, ports in ix_ports.items():
print(ports)
virtual_ports = models.VirtualPort.objects.filter(port__id__in=ports)
rrd_files = [
os.path.join(settings.GRAPHS_PATH, port.meta["graph"])
Expand Down Expand Up @@ -372,9 +377,17 @@ def run(self, *args, **kwargs):
self.render_from_rrd(graph_file)

def render_from_rrd(self, filepath):
self.ix_graphs = {}
for period, resolution, suffix in self.periods:
self._render_from_rrd(filepath, period, resolution, suffix)

for ix_id, graphs in self.ix_graphs.items():
AuditCtlIXTrafficProcessed.create_task(
org=self.org,
ix_id=ix_id,
graphs=graphs,
)

def _render_from_rrd(self, filepath, duration, resolution, suffix):
# get filename
filename = os.path.basename(filepath)
Expand Down Expand Up @@ -403,6 +416,7 @@ def _render_from_rrd(self, filepath, duration, resolution, suffix):
title = f"{obj.name}"
service = "ixctl"
public = True
self.ix_graphs.setdefault(obj_id, []).append(filename)

filepath = os.path.join(settings.GRAPHS_PATH, filepath)

Expand All @@ -428,3 +442,157 @@ def _render_from_rrd(self, filepath, duration, resolution, suffix):
content_type="image/png",
public=public,
)


@register
class AuditCtlIXTrafficReceived(Task):
"""
Send event to auditctl when traffic is received on one or more
virtual ports
"""

class Meta:
proxy = True

class HandleRef:
tag = "task_auditctl_event_traffic_received"

def exchanges(self):
devices = models.Device.objects.filter(instance__org=self.org)
ix_ports = {}
ports = []

for device in devices:
ports += [vp.port.id for vp in device.virtual_ports if vp.port]

members = ixctl.InternetExchangeMember().objects(
ports=ports, org=self.org.remote_id
)
for member in members:
ix_ports.setdefault(member.ix_id, []).append(member.port)

for ix_id, ports in ix_ports.items():
yield ix_id, ports

def run(self, *args, **kwargs):
traffic_data = kwargs.get("data")
base_event_id = f"v0.1/{self.org.slug}/traffic/ix/"
for ix_id, ports in self.exchanges():
ix = ixctl.InternetExchange().object(ix_id)
event_id = f"{base_event_id}{ix_id}/{ix.slug}/received"
virtual_ports = [
vp.id for vp in models.VirtualPort.objects.filter(port__id__in=ports)
]

ix_traffic_data = [
traffic for traffic in traffic_data if traffic["id"] in virtual_ports
]

if not ix_traffic_data:
auditctl.Event().create(
{
"org": self.org.slug,
"object_id": event_id,
"status": "error",
"source": "devicectl",
"error": {
"message": "No data points in traffic data (empty list)",
"components": [
{
"kind": "ix",
"id": ix_id,
"name": ix.name,
}
],
},
}
)
continue

auditctl.Event().create(
{
"org": self.org.slug,
"object_id": event_id,
"status": "ok",
"source": "devicectl",
"data": {
"components": [
{
"kind": "traffic_data",
"blob": json.dumps(ix_traffic_data),
},
{
"kind": "ix",
"id": ix_id,
"name": ix.name,
},
]
},
},
)


@register
class AuditCtlIXTrafficProcessed(Task):
"""
Send event to auditctl when traffic is processed on one or more
virtual ports
"""

event_type = "processed"

class Meta:
proxy = True

class HandleRef:
tag = "task_auditctl_event_traffic_processed"

def run(self, *args, **kwargs):
ix_id = kwargs.get("ix_id")
ix = ixctl.InternetExchange().object(ix_id)
base_event_id = f"v0.1/{self.org.slug}/traffic/ix/"
event_id = f"{base_event_id}{ix_id}/{ix.slug}/processed"
graphs = kwargs.get("graphs")

if not graphs:
auditctl.Event().create(
{
"org": self.org.slug,
"object_id": event_id,
"status": "error",
"source": "devicectl",
"error": {
"message": "No graphs updated",
"components": [
{
"kind": "ix",
"id": ix_id,
"name": ix.name,
}
],
},
}
)
return

auditctl.Event().create(
{
"org": self.org.slug,
"object_id": event_id,
"status": "ok",
"source": "devicectl",
"data": {
"components": [
{
"kind": "ix",
"id": ix_id,
"name": ix.name,
},
{
"kind": "graphs",
"blob": json.dumps(graphs),
},
]
},
},
)
Loading