Skip to content
This repository has been archived by the owner on Nov 2, 2024. It is now read-only.

Commit

Permalink
Adguard dns analyzer, closes intelowlproject#1361 (intelowlproject#2363)
Browse files Browse the repository at this point in the history
* adguard

* adguard

* bad query

* tests

* adguard works now :p

* adguard

* docs+mign

* ci

* ci

* ci

* tests

* ci

* ci

* playbook

* ci try

* ci try

* mign

* mign

---------

Co-authored-by: g4ze <bhaiyajionline@gmail.com>
  • Loading branch information
2 people authored and vaclavbartos committed Oct 13, 2024
1 parent 1666085 commit 6016b97
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 2 deletions.
117 changes: 117 additions & 0 deletions api_app/analyzers_manager/migrations/0101_analyzer_config_adguard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "dns.dns_malicious_detectors.adguard.AdGuard",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "AdGuard",
"description": "[Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.",
"disabled": False,
"soft_time_limit": 30,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": ["url", "domain"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = []

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
("analyzers_manager", "0100_analyzer_config_downloadfilefromuri"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import base64
import logging
from typing import List
from urllib.parse import urlparse

import dns.message
import requests
from dns.rrset import RRset

from api_app.analyzers_manager import classes

from ..dns_responses import malicious_detector_response

logger = logging.getLogger(__name__)


class AdGuard(classes.ObservableAnalyzer):
"""Check if a domain is malicious by AdGuard public resolver."""

url = "https://dns.adguard-dns.com/dns-query"

def update(self) -> bool:
pass

# We make DOH(DNS over http) query out of the observable
# Mainly done using the wire format of the query
# ref: https://datatracker.ietf.org/doc/html/rfc8484
@staticmethod
def encode_query(observable: str) -> str:
logger.info(f"Encoding query for {observable}")
query = dns.message.make_query(observable, "A")
wire_query = query.to_wire()
encoded_query = (
base64.urlsafe_b64encode(wire_query).rstrip(b"=").decode("ascii")
)
logger.info(f"Encoded query: {encoded_query}")
return encoded_query

def filter_query(self, encoded_query: str) -> List[RRset]:
logger.info(
f"Sending filtered request to AdGuard DNS API for query: {encoded_query}"
)
r_filtered = requests.get(
url=f"{self.url}?dns={encoded_query}",
headers={"accept": "application/dns-message"},
)
logger.info(f"Received r_filtered from AdGuard DNS API: {r_filtered.content}")
r_filtered.raise_for_status()
return dns.message.from_wire(r_filtered.content).answer

@staticmethod
def check_a(observable: str, a_filtered: List[RRset]) -> dict:
# adguard follows 2 patterns for malicious domains,
# it either redirects the request to ad-block.dns.adguard.com
# or it sinkholes the request (to 0.0.0.0).
# If the response contains neither of these,
# we can safely say the domain is not malicious
for ans in a_filtered:
if str(ans.name) == "ad-block.dns.adguard.com.":
return malicious_detector_response(
observable=observable, malicious=True
)

if any(str(data) == "0.0.0.0" for data in ans): # nosec B104
return malicious_detector_response(
observable=observable, malicious=True
)

return malicious_detector_response(observable=observable, malicious=False)

def run(self):
logger.info(f"Running AdGuard DNS analyzer for {self.observable_name}")
observable = self.observable_name
# for URLs we are checking the relative domain
if self.observable_classification == self.ObservableTypes.URL:
logger.info(f"Extracting domain from URL {observable}")
observable = urlparse(self.observable_name).hostname
encoded_query = self.encode_query(observable)
a_filtered = self.filter_query(encoded_query)

if not a_filtered:
# dont need to check unfiltered if filtered is empty
# as filter responds even if the domain is not malicious
# and recognised by adguard
logger.info(f"Filtered response is empty for {self.observable_name}")
return malicious_detector_response(
observable=observable,
malicious=False,
note="No response from AdGuard DNS API",
)

return self.check_a(observable, a_filtered)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


def malicious_detector_response(
observable: str, malicious: bool, timeout: bool = False
observable: str, malicious: bool, timeout: bool = False, note: str = None
) -> dict:
"""Standard response for malicious detector analyzers
Expand All @@ -15,6 +15,8 @@ def malicious_detector_response(
:type malicious: bool
:param timeout: set if the DNS query timed-out
:type timeout bool
:param note: additional note to add to the report, default to None
:type note: str, optional
:return:
:rtype: dict
"""
Expand All @@ -23,6 +25,8 @@ def malicious_detector_response(

if timeout:
report["timeout"] = True
if note:
report["note"] = note

return report

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.


from django.db import migrations


def migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc2 = playbook_config.objects.get(name="Dns")
pc.analyzers.add(AnalyzerConfig.objects.get(name="AdGuard").id)
pc2.analyzers.add(AnalyzerConfig.objects.get(name="AdGuard").id)
pc.full_clean()
pc.save()
pc2.full_clean()
pc2.save()


def reverse_migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc2 = playbook_config.objects.get(name="Dns")
pc.analyzers.remove(AnalyzerConfig.objects.get(name="AdGuard").id)
pc2.analyzers.remove(AnalyzerConfig.objects.get(name="AdGuard").id)
pc.full_clean()
pc.save()
pc2.full_clean()
pc2.save()


class Migration(migrations.Migration):
dependencies = [
("playbooks_manager", "0048_playbook_config_download_file"),
("analyzers_manager", "0101_analyzer_config_adguard"),
]

operations = [
migrations.RunPython(migrate, reverse_migrate),
]
2 changes: 1 addition & 1 deletion docs/source/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ The following is the list of the available analyzers you can run out-of-the-box.
* `OrklSearch`:[Orkl](https://orkl.eu/) is the Community Driven Cyber Threat Intelligence Library.
* `Crt_sh`:[Crt_Sh](https://crt.sh/) lets you get certificates info about a domain.
* `Spamhaus_WQS`:[Spamhaus_WQS](https://docs.spamhaus.com/datasets/docs/source/70-access-methods/web-query-service/000-intro.html) : The Spamhaus Web Query Service (WQS) is a method of accessing Spamhaus block lists using the HTTPS protocol.

* `Adguard`: [Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.
##### Generic analyzers (email, phone number, etc.; anything really)

Some analyzers require details other than just IP, URL, Domain, etc. We classified them as `generic` Analyzers. Since the type of field is not known, there is a format for strings to be followed.
Expand Down

0 comments on commit 6016b97

Please sign in to comment.