Skip to content
This repository has been archived by the owner on Nov 2, 2024. It is now read-only.

Commit

Permalink
spamhaus_drop analyzer, closes intelowlproject#2408 (intelowlproject#…
Browse files Browse the repository at this point in the history
…2422)

* spamhaus_drop

* spamhaus_drop

* ip matching

* migratiuons

* migrations

* tests

* tests

* tests

* tests

* tests

* tests

* IocFinder

* bool

* mign

* docs

* mign

* mign

* mign
  • Loading branch information
g4ze authored and vaclavbartos committed Oct 13, 2024
1 parent 749d9fe commit 3410165
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": {
"minute": "0",
"hour": "0",
"day_of_week": "*",
"day_of_month": "*",
"month_of_year": "*",
},
"module": "spamhaus_drop.SpamhausDropV4",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "Spamhaus_DROP",
"description": "[Spamhaus_DROP](https://www.spamhaus.org/blocklists/do-not-route-or-peer/) protects from activity directly originating from rogue networks, such as spam campaigns, encryption via ransomware, DNS-hijacking and exploit attempts, authentication attacks to discover working access credentials, harvesting, DDoS attacks.",
"disabled": False,
"soft_time_limit": 10,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": ["ip"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = []

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
(
"analyzers_manager",
"0109_analyzer_config_iocfinder",
),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
100 changes: 100 additions & 0 deletions api_app/analyzers_manager/observable_analyzers/spamhaus_drop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import bisect
import ipaddress
import json
import logging
import os

import requests
from django.conf import settings

from api_app.analyzers_manager import classes
from api_app.analyzers_manager.exceptions import AnalyzerRunException
from tests.mock_utils import MockUpResponse, if_mock_connections, patch

logger = logging.getLogger(__name__)


class SpamhausDropV4(classes.ObservableAnalyzer):
url = "https://www.spamhaus.org/drop/drop_v4.json"

@classmethod
def location(cls) -> str:
db_name = "drop_v4.json"
return f"{settings.MEDIA_ROOT}/{db_name}"

def run(self):
ip = ipaddress.ip_address(self.observable_name)
database_location = self.location()
if not os.path.exists(database_location):
logger.info(
f"Database does not exist in {database_location}, initialising..."
)
self.update()
with open(database_location, "r") as f:
db = json.load(f)

insertion = bisect.bisect_left(
db, ip, key=lambda x: ipaddress.ip_network(x["cidr"]).network_address
)
matches = []
# Check entries at and after the insertion point
# there maybe one or more subnets contained in the ip
for i in range(insertion, len(db)):
network = ipaddress.ip_network(db[i]["cidr"])
if ip in network:
matches.append(db[i])
elif network.network_address > ip:
break
if matches:
return {"found": True, "details": matches}

return {"found": False}

@classmethod
def update(cls):
logger.info(f"Updating database from {cls.url}")
response = requests.get(url=cls.url)
response.raise_for_status()
data = cls.convert_to_json(response.text)
database_location = cls.location()

with open(database_location, "w", encoding="utf-8") as f:
json.dump(data, f)
logger.info(f"Database updated at {database_location}")

@staticmethod
def convert_to_json(input_string) -> dict:
lines = input_string.strip().split("\n")
json_objects = []
for line in lines:
line = line.strip()
if not line:
continue
try:
json_obj = json.loads(line)
json_objects.append(json_obj)
except json.JSONDecodeError:
raise AnalyzerRunException(
"Invalid JSON format in the response while updating the database"
)

return json_objects

@classmethod
def _monkeypatch(cls):
mock_data = (
'{"cidr": "1.10.16.0/20", "sblid": "SBL256894", "rir": "apnic"}\n'
'{"cidr": "2.56.192.0/22", "sblid": "SBL459831", "rir": "ripencc"}'
)
patches = [
if_mock_connections(
patch(
"requests.get",
return_value=MockUpResponse(
mock_data,
200,
),
),
)
]
return super()._monkeypatch(patches=patches)
4 changes: 3 additions & 1 deletion docs/source/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,13 @@ The following is the list of the available analyzers you can run out-of-the-box.
* `OrklSearch`:[Orkl](https://orkl.eu/) is the Community Driven Cyber Threat Intelligence Library.
* `Crt_sh`:[Crt_Sh](https://crt.sh/) lets you get certificates info about a domain.
* `Spamhaus_WQS`:[Spamhaus_WQS](https://docs.spamhaus.com/datasets/docs/source/70-access-methods/web-query-service/000-intro.html) The Spamhaus Web Query Service (WQS) is a method of accessing Spamhaus block lists using the HTTPS protocol.
* `Adguard`: [Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.* `Adguard`: [Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.
* `Adguard`:[Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.
* `JA4_DB`:[JA4_DB](https://ja4db.com/) lets you search a fingerprint in JA4 databse.
* `LeakIX`:[LeakIX](https://leakix.net/) is a red-team search engine indexing mis-configurations and vulnerabilities online.
* `ApiVoid`:[ApiVoid](https://www.apivoid.com/) provides JSON APIs useful for cyber threat analysis, threat detection and
threat prevention, reducing and automating the manual work of security analysts.
* `Spamhaus_DROP`:[Spamhaus_DROP](https://www.spamhaus.org/blocklists/do-not-route-or-peer/) protects from activity directly originating from rogue networks, such as spam campaigns, encryption via ransomware, DNS-hijacking and exploit attempts, authentication attacks to discover working access credentials, harvesting, DDoS attacks.

##### Generic analyzers (email, phone number, etc.; anything really)

Some analyzers require details other than just IP, URL, Domain, etc. We classified them as `generic` Analyzers. Since the type of field is not known, there is a format for strings to be followed.
Expand Down

0 comments on commit 3410165

Please sign in to comment.