Skip to content

Commit

Permalink
Polyswarm analyzer closes#1255 (#2439)
Browse files Browse the repository at this point in the history
* polyswarm

* polyswarm

* polyswarm

* polyswarm

* polyswarm

* mign

* logs

* logs

* mign

* obs

* obs

* obs

* tests

* modular
  • Loading branch information
g4ze authored Aug 8, 2024
1 parent 3d759ab commit ee206c6
Show file tree
Hide file tree
Showing 7 changed files with 504 additions and 1 deletion.
111 changes: 111 additions & 0 deletions api_app/analyzers_manager/file_analyzers/polyswarm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.
import logging

from polyswarm_api.api import PolyswarmAPI

from api_app.analyzers_manager.classes import FileAnalyzer
from api_app.analyzers_manager.exceptions import AnalyzerRunException
from tests.mock_utils import if_mock_connections, patch

logger = logging.getLogger(__name__)

import abc

from api_app.analyzers_manager.classes import BaseAnalyzerMixin


class PolyswarmBase(BaseAnalyzerMixin, metaclass=abc.ABCMeta):
# this class also acts as a super class
# for PolyswarmObs in observable analyzers
url = "https://api.polyswarm.network/v3"
_api_key: str = None
timeout: int = 60 * 15 # default as in the package settings
polyswarm_community: str = "default"

def update(self):
pass

@staticmethod
def construct_result(result):
res = {"assertions": []}
positives = 0
total = 0
for assertion in result.assertions:
if assertion.verdict:
positives += 1
total += 1
res["assertions"].append(
{
"engine": assertion.author_name,
"asserts": "Malicious" if assertion.verdict else "Benign",
}
)
res["positives"] = positives
res["total"] = total
res["PolyScore"] = result.polyscore
res["sha256"] = result.sha256
res["md5"] = result.md5
res["sha1"] = result.sha1
res["extended_type"] = result.extended_type
res["first_seen"] = result.first_seen.isoformat()
res["last_seen"] = result.last_seen.isoformat()
res["permalink"] = result.permalink
return res


class Polyswarm(FileAnalyzer, PolyswarmBase):
def run(self):
api = PolyswarmAPI(key=self._api_key, community=self.polyswarm_community)
instance = api.submit(self.filepath)
result = api.wait_for(instance, timeout=self.timeout)
if result.failed:
raise AnalyzerRunException(
f"Failed to get results from Polyswarm for {self.md5}"
)
result = self.construct_result(result)

return result

def update(self):
pass

@classmethod
def _monkeypatch(cls):
patches = [
if_mock_connections(
patch.object(
Polyswarm,
"run",
# flake8: noqa
return_value={
"assertions": [
{"engine": "Kaspersky", "asserts": "Benign"},
{"engine": "Qihoo 360", "asserts": "Benign"},
{"engine": "XVirus", "asserts": "Benign"},
{"engine": "SecureAge", "asserts": "Benign"},
{"engine": "DrWeb", "asserts": "Benign"},
{"engine": "Proton", "asserts": "Benign"},
{"engine": "Electron", "asserts": "Benign"},
{"engine": "Filseclab", "asserts": "Benign"},
{"engine": "ClamAV", "asserts": "Benign"},
{"engine": "SecondWrite", "asserts": "Benign"},
{"engine": "Ikarus", "asserts": "Benign"},
{"engine": "NanoAV", "asserts": "Benign"},
{"engine": "Alibaba", "asserts": "Benign"},
],
"positives": 0,
"total": 13,
"PolyScore": 0.33460048640798623,
"sha256": "50f4d8be8d47d26ecb04f1a24f17a39f3ea194d8cdc3b833aef2df88e1ce828b",
"md5": "76deca20806c16df50ffeda163fd50e9",
"sha1": "99ff1cd17aea94feb355e7bdb01e9f788a4971bb",
"extended_type": "GIF image data, version 89a, 821 x 500",
"first_seen": "2024-07-27T20:20:12.121980",
"last_seen": "2024-07-27T20:20:12.121980",
"permalink": "https://polyswarm.network/scan/results/file/50f4d8be8d47d26ecb04f1a24f17a39f3ea194d8cdc3b833aef2df88e1ce828b/76218824984622961",
},
),
)
]
return super()._monkeypatch(patches=patches)
196 changes: 196 additions & 0 deletions api_app/analyzers_manager/migrations/0113_analyzer_config_polyswarm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "polyswarm.Polyswarm",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "Polyswarm",
"description": "Scan a file using the [Polyswarm](https://docs.polyswarm.io/) API.",
"disabled": False,
"soft_time_limit": 900,
"routing_key": "default",
"health_check_status": True,
"type": "file",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": [],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = [
{
"python_module": {
"module": "polyswarm.Polyswarm",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "api_key",
"type": "str",
"description": "api key for polyswarm",
"is_secret": True,
"required": False,
},
{
"python_module": {
"module": "polyswarm.Polyswarm",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "timeout",
"type": "int",
"description": "timeout for Polyswarm api, default is 900s",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "polyswarm.Polyswarm",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "polyswarm_community",
"type": "str",
"description": 'polyswarm_community for polyswarm analyzer, default is "default"',
"is_secret": False,
"required": False,
},
]

values = [
{
"parameter": {
"python_module": {
"module": "polyswarm.Polyswarm",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "timeout",
"type": "int",
"description": "timeout for Polyswarm api, default is 900s",
"is_secret": False,
"required": False,
},
"analyzer_config": "Polyswarm",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": 900,
"updated_at": "2024-07-28T18:00:00.981259Z",
"owner": None,
},
{
"parameter": {
"python_module": {
"module": "polyswarm.Polyswarm",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "polyswarm_community",
"type": "str",
"description": 'polyswarm_community for polyswarm analyzer, default is "default"',
"is_secret": False,
"required": False,
},
"analyzer_config": "Polyswarm",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": "default",
"updated_at": "2024-07-28T18:00:01.001510Z",
"owner": None,
},
]


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
("analyzers_manager", "0112_analyzer_config_criminalip_scan"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
Loading

0 comments on commit ee206c6

Please sign in to comment.