From 6016b97fe90774552bc4d99fc0e7c398ec69561f Mon Sep 17 00:00:00 2001 From: Nilay Gupta <102874321+g4ze@users.noreply.github.com> Date: Wed, 3 Jul 2024 12:36:38 +0530 Subject: [PATCH] Adguard dns analyzer, closes #1361 (#2363) * adguard * adguard * bad query * tests * adguard works now :p * adguard * docs+mign * ci * ci * ci * tests * ci * ci * playbook * ci try * ci try * mign * mign --------- Co-authored-by: g4ze --- .../0101_analyzer_config_adguard.py | 117 ++++++++++++++++++ .../dns/dns_malicious_detectors/adguard.py | 92 ++++++++++++++ .../observable_analyzers/dns/dns_responses.py | 6 +- ...0049_add_adguard_to_free_to_use_and_dns.py | 42 +++++++ docs/source/Usage.md | 2 +- 5 files changed, 257 insertions(+), 2 deletions(-) create mode 100644 api_app/analyzers_manager/migrations/0101_analyzer_config_adguard.py create mode 100644 api_app/analyzers_manager/observable_analyzers/dns/dns_malicious_detectors/adguard.py create mode 100644 api_app/playbooks_manager/migrations/0049_add_adguard_to_free_to_use_and_dns.py diff --git a/api_app/analyzers_manager/migrations/0101_analyzer_config_adguard.py b/api_app/analyzers_manager/migrations/0101_analyzer_config_adguard.py new file mode 100644 index 0000000000..6cb74ce4a2 --- /dev/null +++ b/api_app/analyzers_manager/migrations/0101_analyzer_config_adguard.py @@ -0,0 +1,117 @@ +from django.db import migrations +from django.db.models.fields.related_descriptors import ( + ForwardManyToOneDescriptor, + ForwardOneToOneDescriptor, + ManyToManyDescriptor, +) + +plugin = { + "python_module": { + "health_check_schedule": None, + "update_schedule": None, + "module": "dns.dns_malicious_detectors.adguard.AdGuard", + "base_path": "api_app.analyzers_manager.observable_analyzers", + }, + "name": "AdGuard", + "description": "[Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.", + "disabled": False, + "soft_time_limit": 30, + "routing_key": "default", + "health_check_status": True, + "type": "observable", + "docker_based": False, + "maximum_tlp": "AMBER", + "observable_supported": ["url", "domain"], + "supported_filetypes": [], + "run_hash": False, + "run_hash_type": "", + "not_supported_filetypes": [], + "model": "analyzers_manager.AnalyzerConfig", +} + +params = [] + +values = [] + + +def _get_real_obj(Model, field, value): + def _get_obj(Model, other_model, value): + if isinstance(value, dict): + real_vals = {} + for key, real_val in value.items(): + real_vals[key] = _get_real_obj(other_model, key, real_val) + value = other_model.objects.get_or_create(**real_vals)[0] + # it is just the primary key serialized + else: + if isinstance(value, int): + if Model.__name__ == "PluginConfig": + value = other_model.objects.get(name=plugin["name"]) + else: + value = other_model.objects.get(pk=value) + else: + value = other_model.objects.get(name=value) + return value + + if ( + type(getattr(Model, field)) + in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor] + and value + ): + other_model = getattr(Model, field).get_queryset().model + value = _get_obj(Model, other_model, value) + elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: + other_model = getattr(Model, field).rel.model + value = [_get_obj(Model, other_model, val) for val in value] + return value + + +def _create_object(Model, data): + mtm, no_mtm = {}, {} + for field, value in data.items(): + value = _get_real_obj(Model, field, value) + if type(getattr(Model, field)) is ManyToManyDescriptor: + mtm[field] = value + else: + no_mtm[field] = value + try: + o = Model.objects.get(**no_mtm) + except Model.DoesNotExist: + o = Model(**no_mtm) + o.full_clean() + o.save() + for field, value in mtm.items(): + attribute = getattr(o, field) + if value is not None: + attribute.set(value) + return False + return True + + +def migrate(apps, schema_editor): + Parameter = apps.get_model("api_app", "Parameter") + PluginConfig = apps.get_model("api_app", "PluginConfig") + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + if not Model.objects.filter(name=plugin["name"]).exists(): + exists = _create_object(Model, plugin) + if not exists: + for param in params: + _create_object(Parameter, param) + for value in values: + _create_object(PluginConfig, value) + + +def reverse_migrate(apps, schema_editor): + python_path = plugin.pop("model") + Model = apps.get_model(*python_path.split(".")) + Model.objects.get(name=plugin["name"]).delete() + + +class Migration(migrations.Migration): + atomic = False + dependencies = [ + ("api_app", "0062_alter_parameter_python_module"), + ("analyzers_manager", "0100_analyzer_config_downloadfilefromuri"), + ] + + operations = [migrations.RunPython(migrate, reverse_migrate)] diff --git a/api_app/analyzers_manager/observable_analyzers/dns/dns_malicious_detectors/adguard.py b/api_app/analyzers_manager/observable_analyzers/dns/dns_malicious_detectors/adguard.py new file mode 100644 index 0000000000..81c1676f69 --- /dev/null +++ b/api_app/analyzers_manager/observable_analyzers/dns/dns_malicious_detectors/adguard.py @@ -0,0 +1,92 @@ +import base64 +import logging +from typing import List +from urllib.parse import urlparse + +import dns.message +import requests +from dns.rrset import RRset + +from api_app.analyzers_manager import classes + +from ..dns_responses import malicious_detector_response + +logger = logging.getLogger(__name__) + + +class AdGuard(classes.ObservableAnalyzer): + """Check if a domain is malicious by AdGuard public resolver.""" + + url = "https://dns.adguard-dns.com/dns-query" + + def update(self) -> bool: + pass + + # We make DOH(DNS over http) query out of the observable + # Mainly done using the wire format of the query + # ref: https://datatracker.ietf.org/doc/html/rfc8484 + @staticmethod + def encode_query(observable: str) -> str: + logger.info(f"Encoding query for {observable}") + query = dns.message.make_query(observable, "A") + wire_query = query.to_wire() + encoded_query = ( + base64.urlsafe_b64encode(wire_query).rstrip(b"=").decode("ascii") + ) + logger.info(f"Encoded query: {encoded_query}") + return encoded_query + + def filter_query(self, encoded_query: str) -> List[RRset]: + logger.info( + f"Sending filtered request to AdGuard DNS API for query: {encoded_query}" + ) + r_filtered = requests.get( + url=f"{self.url}?dns={encoded_query}", + headers={"accept": "application/dns-message"}, + ) + logger.info(f"Received r_filtered from AdGuard DNS API: {r_filtered.content}") + r_filtered.raise_for_status() + return dns.message.from_wire(r_filtered.content).answer + + @staticmethod + def check_a(observable: str, a_filtered: List[RRset]) -> dict: + # adguard follows 2 patterns for malicious domains, + # it either redirects the request to ad-block.dns.adguard.com + # or it sinkholes the request (to 0.0.0.0). + # If the response contains neither of these, + # we can safely say the domain is not malicious + for ans in a_filtered: + if str(ans.name) == "ad-block.dns.adguard.com.": + return malicious_detector_response( + observable=observable, malicious=True + ) + + if any(str(data) == "0.0.0.0" for data in ans): # nosec B104 + return malicious_detector_response( + observable=observable, malicious=True + ) + + return malicious_detector_response(observable=observable, malicious=False) + + def run(self): + logger.info(f"Running AdGuard DNS analyzer for {self.observable_name}") + observable = self.observable_name + # for URLs we are checking the relative domain + if self.observable_classification == self.ObservableTypes.URL: + logger.info(f"Extracting domain from URL {observable}") + observable = urlparse(self.observable_name).hostname + encoded_query = self.encode_query(observable) + a_filtered = self.filter_query(encoded_query) + + if not a_filtered: + # dont need to check unfiltered if filtered is empty + # as filter responds even if the domain is not malicious + # and recognised by adguard + logger.info(f"Filtered response is empty for {self.observable_name}") + return malicious_detector_response( + observable=observable, + malicious=False, + note="No response from AdGuard DNS API", + ) + + return self.check_a(observable, a_filtered) diff --git a/api_app/analyzers_manager/observable_analyzers/dns/dns_responses.py b/api_app/analyzers_manager/observable_analyzers/dns/dns_responses.py index a8034756af..c606fe826e 100644 --- a/api_app/analyzers_manager/observable_analyzers/dns/dns_responses.py +++ b/api_app/analyzers_manager/observable_analyzers/dns/dns_responses.py @@ -5,7 +5,7 @@ def malicious_detector_response( - observable: str, malicious: bool, timeout: bool = False + observable: str, malicious: bool, timeout: bool = False, note: str = None ) -> dict: """Standard response for malicious detector analyzers @@ -15,6 +15,8 @@ def malicious_detector_response( :type malicious: bool :param timeout: set if the DNS query timed-out :type timeout bool + :param note: additional note to add to the report, default to None + :type note: str, optional :return: :rtype: dict """ @@ -23,6 +25,8 @@ def malicious_detector_response( if timeout: report["timeout"] = True + if note: + report["note"] = note return report diff --git a/api_app/playbooks_manager/migrations/0049_add_adguard_to_free_to_use_and_dns.py b/api_app/playbooks_manager/migrations/0049_add_adguard_to_free_to_use_and_dns.py new file mode 100644 index 0000000000..e7cf6c470e --- /dev/null +++ b/api_app/playbooks_manager/migrations/0049_add_adguard_to_free_to_use_and_dns.py @@ -0,0 +1,42 @@ +# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl +# See the file 'LICENSE' for copying permission. + + +from django.db import migrations + + +def migrate(apps, schema_editor): + playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig") + AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig") + pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS") + pc2 = playbook_config.objects.get(name="Dns") + pc.analyzers.add(AnalyzerConfig.objects.get(name="AdGuard").id) + pc2.analyzers.add(AnalyzerConfig.objects.get(name="AdGuard").id) + pc.full_clean() + pc.save() + pc2.full_clean() + pc2.save() + + +def reverse_migrate(apps, schema_editor): + playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig") + AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig") + pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS") + pc2 = playbook_config.objects.get(name="Dns") + pc.analyzers.remove(AnalyzerConfig.objects.get(name="AdGuard").id) + pc2.analyzers.remove(AnalyzerConfig.objects.get(name="AdGuard").id) + pc.full_clean() + pc.save() + pc2.full_clean() + pc2.save() + + +class Migration(migrations.Migration): + dependencies = [ + ("playbooks_manager", "0048_playbook_config_download_file"), + ("analyzers_manager", "0101_analyzer_config_adguard"), + ] + + operations = [ + migrations.RunPython(migrate, reverse_migrate), + ] diff --git a/docs/source/Usage.md b/docs/source/Usage.md index a7114c975e..399abc86c3 100644 --- a/docs/source/Usage.md +++ b/docs/source/Usage.md @@ -264,7 +264,7 @@ The following is the list of the available analyzers you can run out-of-the-box. * `OrklSearch`:[Orkl](https://orkl.eu/) is the Community Driven Cyber Threat Intelligence Library. * `Crt_sh`:[Crt_Sh](https://crt.sh/) lets you get certificates info about a domain. * `Spamhaus_WQS`:[Spamhaus_WQS](https://docs.spamhaus.com/datasets/docs/source/70-access-methods/web-query-service/000-intro.html) : The Spamhaus Web Query Service (WQS) is a method of accessing Spamhaus block lists using the HTTPS protocol. - +* `Adguard`: [Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking. ##### Generic analyzers (email, phone number, etc.; anything really) Some analyzers require details other than just IP, URL, Domain, etc. We classified them as `generic` Analyzers. Since the type of field is not known, there is a format for strings to be followed.