From 3410165e2ca930e0c6a0dc83dcace57a18669bde Mon Sep 17 00:00:00 2001 From: Nilay Gupta <102874321+g4ze@users.noreply.github.com> Date: Mon, 22 Jul 2024 14:53:38 +0530 Subject: [PATCH] spamhaus_drop analyzer, closes #2408 (#2422) * spamhaus_drop * spamhaus_drop * ip matching * migratiuons * migrations * tests * tests * tests * tests * tests * tests * IocFinder * bool * mign * docs * mign * mign * mign --- .../0110_analyzer_config_spamhaus_drop.py | 126 ++++++++++++++++++ .../observable_analyzers/spamhaus_drop.py | 100 ++++++++++++++ docs/source/Usage.md | 4 +- 3 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 api_app/analyzers_manager/migrations/0110_analyzer_config_spamhaus_drop.py create mode 100644 api_app/analyzers_manager/observable_analyzers/spamhaus_drop.py diff --git a/api_app/analyzers_manager/migrations/0110_analyzer_config_spamhaus_drop.py b/api_app/analyzers_manager/migrations/0110_analyzer_config_spamhaus_drop.py new file mode 100644 index 0000000000..e43cf88d62 --- /dev/null +++ b/api_app/analyzers_manager/migrations/0110_analyzer_config_spamhaus_drop.py @@ -0,0 +1,126 @@ +from django.db import migrations +from django.db.models.fields.related_descriptors import ( + ForwardManyToOneDescriptor, + ForwardOneToOneDescriptor, + ManyToManyDescriptor, +) + +plugin = { + "python_module": { + "health_check_schedule": None, + "update_schedule": { + "minute": "0", + "hour": "0", + "day_of_week": "*", + "day_of_month": "*", + "month_of_year": "*", + }, + "module": "spamhaus_drop.SpamhausDropV4", + "base_path": "api_app.analyzers_manager.observable_analyzers", + }, + "name": "Spamhaus_DROP", + "description": "[Spamhaus_DROP](https://www.spamhaus.org/blocklists/do-not-route-or-peer/) protects from activity directly originating from rogue networks, such as spam campaigns, encryption via ransomware, DNS-hijacking and exploit attempts, authentication attacks to discover working access credentials, harvesting, DDoS attacks.", + "disabled": False, + "soft_time_limit": 10, + "routing_key": "default", + "health_check_status": True, + "type": "observable", + "docker_based": False, + "maximum_tlp": "AMBER", + "observable_supported": ["ip"], + "supported_filetypes": [], + "run_hash": False, + "run_hash_type": "", + "not_supported_filetypes": [], + "model": "analyzers_manager.AnalyzerConfig", +} + +params = [] + +values = [] + + +def _get_real_obj(Model, field, value): + def _get_obj(Model, other_model, value): + if isinstance(value, dict): + real_vals = {} + for key, real_val in value.items(): + real_vals[key] = _get_real_obj(other_model, key, real_val) + value = other_model.objects.get_or_create(**real_vals)[0] + # it is just the primary key serialized + else: + if isinstance(value, int): + if Model.__name__ == "PluginConfig": + value = other_model.objects.get(name=plugin["name"]) + else: + value = other_model.objects.get(pk=value) + else: + value = other_model.objects.get(name=value) + return value + + if ( + type(getattr(Model, field)) + in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor] + and value + ): + other_model = getattr(Model, field).get_queryset().model + value = _get_obj(Model, other_model, value) + elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: + other_model = getattr(Model, field).rel.model + value = [_get_obj(Model, other_model, val) for val in value] + return value + + +def _create_object(Model, data): + mtm, no_mtm = {}, {} + for field, value in data.items(): + value = _get_real_obj(Model, field, value) + if type(getattr(Model, field)) is ManyToManyDescriptor: + mtm[field] = value + else: + no_mtm[field] = value + try: + o = Model.objects.get(**no_mtm) + except Model.DoesNotExist: + o = Model(**no_mtm) + o.full_clean() + o.save() + for field, value in mtm.items(): + attribute = getattr(o, field) + if value is not None: + attribute.set(value) + return False + return True + + +def migrate(apps, schema_editor): + Parameter = apps.get_model("api_app", "Parameter") + PluginConfig = apps.get_model("api_app", "PluginConfig") + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + if not Model.objects.filter(name=plugin["name"]).exists(): + exists = _create_object(Model, plugin) + if not exists: + for param in params: + _create_object(Parameter, param) + for value in values: + _create_object(PluginConfig, value) + + +def reverse_migrate(apps, schema_editor): + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + Model.objects.get(name=plugin["name"]).delete() + + +class Migration(migrations.Migration): + atomic = False + dependencies = [ + ("api_app", "0062_alter_parameter_python_module"), + ( + "analyzers_manager", + "0109_analyzer_config_iocfinder", + ), + ] + + operations = [migrations.RunPython(migrate, reverse_migrate)] diff --git a/api_app/analyzers_manager/observable_analyzers/spamhaus_drop.py b/api_app/analyzers_manager/observable_analyzers/spamhaus_drop.py new file mode 100644 index 0000000000..2f81bf31bf --- /dev/null +++ b/api_app/analyzers_manager/observable_analyzers/spamhaus_drop.py @@ -0,0 +1,100 @@ +import bisect +import ipaddress +import json +import logging +import os + +import requests +from django.conf import settings + +from api_app.analyzers_manager import classes +from api_app.analyzers_manager.exceptions import AnalyzerRunException +from tests.mock_utils import MockUpResponse, if_mock_connections, patch + +logger = logging.getLogger(__name__) + + +class SpamhausDropV4(classes.ObservableAnalyzer): + url = "https://www.spamhaus.org/drop/drop_v4.json" + + @classmethod + def location(cls) -> str: + db_name = "drop_v4.json" + return f"{settings.MEDIA_ROOT}/{db_name}" + + def run(self): + ip = ipaddress.ip_address(self.observable_name) + database_location = self.location() + if not os.path.exists(database_location): + logger.info( + f"Database does not exist in {database_location}, initialising..." + ) + self.update() + with open(database_location, "r") as f: + db = json.load(f) + + insertion = bisect.bisect_left( + db, ip, key=lambda x: ipaddress.ip_network(x["cidr"]).network_address + ) + matches = [] + # Check entries at and after the insertion point + # there maybe one or more subnets contained in the ip + for i in range(insertion, len(db)): + network = ipaddress.ip_network(db[i]["cidr"]) + if ip in network: + matches.append(db[i]) + elif network.network_address > ip: + break + if matches: + return {"found": True, "details": matches} + + return {"found": False} + + @classmethod + def update(cls): + logger.info(f"Updating database from {cls.url}") + response = requests.get(url=cls.url) + response.raise_for_status() + data = cls.convert_to_json(response.text) + database_location = cls.location() + + with open(database_location, "w", encoding="utf-8") as f: + json.dump(data, f) + logger.info(f"Database updated at {database_location}") + + @staticmethod + def convert_to_json(input_string) -> dict: + lines = input_string.strip().split("\n") + json_objects = [] + for line in lines: + line = line.strip() + if not line: + continue + try: + json_obj = json.loads(line) + json_objects.append(json_obj) + except json.JSONDecodeError: + raise AnalyzerRunException( + "Invalid JSON format in the response while updating the database" + ) + + return json_objects + + @classmethod + def _monkeypatch(cls): + mock_data = ( + '{"cidr": "1.10.16.0/20", "sblid": "SBL256894", "rir": "apnic"}\n' + '{"cidr": "2.56.192.0/22", "sblid": "SBL459831", "rir": "ripencc"}' + ) + patches = [ + if_mock_connections( + patch( + "requests.get", + return_value=MockUpResponse( + mock_data, + 200, + ), + ), + ) + ] + return super()._monkeypatch(patches=patches) diff --git a/docs/source/Usage.md b/docs/source/Usage.md index 587e2c6939..0885d33902 100644 --- a/docs/source/Usage.md +++ b/docs/source/Usage.md @@ -266,11 +266,13 @@ The following is the list of the available analyzers you can run out-of-the-box. * `OrklSearch`:[Orkl](https://orkl.eu/) is the Community Driven Cyber Threat Intelligence Library. * `Crt_sh`:[Crt_Sh](https://crt.sh/) lets you get certificates info about a domain. * `Spamhaus_WQS`:[Spamhaus_WQS](https://docs.spamhaus.com/datasets/docs/source/70-access-methods/web-query-service/000-intro.html) The Spamhaus Web Query Service (WQS) is a method of accessing Spamhaus block lists using the HTTPS protocol. -* `Adguard`: [Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.* `Adguard`: [Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking. +* `Adguard`:[Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking. * `JA4_DB`:[JA4_DB](https://ja4db.com/) lets you search a fingerprint in JA4 databse. * `LeakIX`:[LeakIX](https://leakix.net/) is a red-team search engine indexing mis-configurations and vulnerabilities online. * `ApiVoid`:[ApiVoid](https://www.apivoid.com/) provides JSON APIs useful for cyber threat analysis, threat detection and threat prevention, reducing and automating the manual work of security analysts. +* `Spamhaus_DROP`:[Spamhaus_DROP](https://www.spamhaus.org/blocklists/do-not-route-or-peer/) protects from activity directly originating from rogue networks, such as spam campaigns, encryption via ransomware, DNS-hijacking and exploit attempts, authentication attacks to discover working access credentials, harvesting, DDoS attacks. + ##### Generic analyzers (email, phone number, etc.; anything really) Some analyzers require details other than just IP, URL, Domain, etc. We classified them as `generic` Analyzers. Since the type of field is not known, there is a format for strings to be followed.