Skip to content
This repository has been archived by the owner on Nov 2, 2024. It is now read-only.

Commit

Permalink
Iocextract analyzer#1228 (intelowlproject#2426)
Browse files Browse the repository at this point in the history
* iocextract

* iocextract

* iocextract

* ioc

* iocextract

* logs

* mign
  • Loading branch information
g4ze authored and Michalsus committed Oct 11, 2024
1 parent 9e363f3 commit 3bc2621
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 0 deletions.
70 changes: 70 additions & 0 deletions api_app/analyzers_manager/file_analyzers/iocextract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging

import iocextract as i

from api_app.analyzers_manager.classes import FileAnalyzer

logger = logging.getLogger(__name__)


class IocExtract(FileAnalyzer):
refang: bool = False
defang: bool = False
strip: bool = False
extract_urls: bool = False
extract_ips: bool = False
extract_emails: bool = False
extract_hashes: bool = False
extract_yara_rules: bool = False
extract_telephone_nums: bool = False
extract_iocs: bool = True

def update(self):
pass

def run(self):
logger.info(f"Running IocExtract on {self.filename} with md5: {self.md5}")
binary_data = self.read_file_bytes()
text_data = binary_data.decode("utf-8")
result = {}
if self.extract_iocs:
all_iocs = list(
i.extract_iocs(text_data, refang=self.refang, strip=self.strip)
)
result["all_iocs"] = all_iocs

else:
extraction_methods = {
"urls": (
self.extract_urls,
lambda: i.extract_urls(
text_data,
refang=self.refang,
strip=self.strip,
defang=self.defang,
),
),
"ips": (
self.extract_ips,
lambda: i.extract_ips(text_data, refang=self.refang),
),
"emails": (
self.extract_emails,
lambda: i.extract_emails(text_data, refang=self.refang),
),
"hashes": (self.extract_hashes, lambda: i.extract_hashes(text_data)),
"yara_rules": (
self.extract_yara_rules,
lambda: i.extract_yara_rules(text_data),
),
"telephone_nums": (
self.extract_telephone_nums,
lambda: i.extract_telephone_nums(text_data),
),
}
for key, (flag, method) in extraction_methods.items():
if flag:
extracted = list(method())
result[key] = extracted

return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "IocExtract",
"description": '[IocExtract](https://github.com/InQuest/iocextract) package is a library and command line interface (CLI) for extracting URLs, IP addresses, MD5/SHA hashes, email addresses, and YARA rules from text corpora. It allows for you to extract encoded and "defanged" IOCs and optionally decode or refang them.',
"disabled": False,
"soft_time_limit": 60,
"routing_key": "default",
"health_check_status": True,
"type": "file",
"docker_based": False,
"maximum_tlp": "RED",
"observable_supported": [],
"supported_filetypes": ["text/plain"],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = [
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "extract_urls",
"type": "bool",
"description": "Extract URLs!",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "extract_ips",
"type": "bool",
"description": "Extract IP addresses!\r\n\r\nIncludes both IPv4 and IPv6 addresses.",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "extract_emails",
"type": "bool",
"description": "Extract email addresses!",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "extract_hashes",
"type": "bool",
"description": "Extract MD5/SHA hashes!",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "extract_yara_rules",
"type": "bool",
"description": "Extract YARA rules!",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "extract_telephone_nums",
"type": "bool",
"description": "Extract telephone numbers!",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "refang",
"type": "bool",
"description": "Refang output",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "strip",
"type": "bool",
"description": "Strip possible garbage from the end of URLs",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "defang",
"type": "bool",
"description": "Extract non-defanged IOCs",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "iocextract.IocExtract",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "extract_iocs",
"type": "bool",
"description": "Extract all IOCs!",
"is_secret": False,
"required": False,
},
]

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
("analyzers_manager", "0107_analyzer_config_apivoid"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
1 change: 1 addition & 0 deletions docs/source/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ The following is the list of the available analyzers you can run out-of-the-box.
- `Zippy_scan` : [Zippy](https://github.com/thinkst/zippy): Fast method to classify text as AI or human-generated; takes in `lzma`,`zlib`,`brotli` as input based engines; `ensemble` being default.
- `Blint`: [Blint](https://github.com/owasp-dep-scan/blint) is a Binary Linter that checks the security properties and capabilities of your executables. Supported binary formats: - Android (apk, aab) - ELF (GNU, musl) - PE (exe, dll) - Mach-O (x64, arm64)
- `MalprobScan` : [Malprob](https://malprob.io/) is a leading malware detection and identification service, powered by cutting-edge AI technology.
- `IocExtract`: [IocExtract](https://github.com/InQuest/iocextract) package is a library and command line interface (CLI) for extracting URLs, IP addresses, MD5/SHA hashes, email addresses, and YARA rules from text corpora. It allows for you to extract encoded and "defanged" IOCs and optionally decode or refang them.

##### Observable analyzers (ip, domain, url, hash)

Expand Down
1 change: 1 addition & 0 deletions requirements/project-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ blint==2.1.5
hfinger==0.2.2
permhash==0.1.4
ail_typo_squatting==2.7.4
iocextract==1.16.1
# this is required because XLMMacroDeobfuscator does not pin the following packages
pyxlsb2==0.0.8
xlrd2==1.3.4
Expand Down

0 comments on commit 3bc2621

Please sign in to comment.