Skip to content

New parallelized IOC report. #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 28, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 156 additions & 5 deletions inquestlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
inquestlabs [options] yara widere <regex> [(--big-endian|--little-endian)]
inquestlabs [options] lookup ip <ioc>
inquestlabs [options] lookup domain <ioc>
inquestlabs [options] report <ioc>
inquestlabs [options] stats
inquestlabs [options] setup <apikey>
inquestlabs [options] trystero list-days
Expand All @@ -44,6 +45,7 @@
--little-endian Toggle little endian.
--offset=<offset> Specify an offset other than 0 for the trigger.
--proxy=<proxy> Intermediate proxy
--timeout=<timeout> Maximum amount of time to wait for IOC report.
--verbose=<level> Verbosity level, outputs to stderr [default: 0].
--version Show version.
"""
Expand All @@ -68,6 +70,8 @@
pass

# standard libraries.
import multiprocessing
import ipaddress
import hashlib
import random
import time
Expand All @@ -76,17 +80,27 @@
import os
import re

__version__ = 1.0
__version__ = 1.1

VALID_CAT = ["ext", "hash", "ioc"]
VALID_EXT = ["code", "context", "metadata", "ocr"]
VALID_HASH = ["md5", "sha1", "sha256", "sha512"]
VALID_IOC = ["domain", "email", "filename", "filepath", "ip", "registry", "url", "xmpid"]
VALID_CAT = ["ext", "hash", "ioc"]
VALID_EXT = ["code", "context", "metadata", "ocr"]
VALID_HASH = ["md5", "sha1", "sha256", "sha512"]
VALID_IOC = ["domain", "email", "filename", "filepath", "ip", "registry", "url", "xmpid"]
VALID_DOMAIN = re.compile("[a-zA-Z0-9-_]+\.[a-zA-Z0-9-_]+")

# verbosity levels.
INFO = 1
DEBUG = 2

########################################################################################################################
def worker_proxy (labs, endpoint, arguments, response):
"""
proxy function for multiprocessing wrapper used by inquestlabs_api.report()
"""

response[endpoint] = getattr(labs, endpoint)(*arguments)


########################################################################################################################
class inquestlabs_exception(Exception):
pass
Expand Down Expand Up @@ -814,6 +828,59 @@ def iocdb_sources (self):

return self.API("/iocdb/sources")

########################################################################################################################
def is_ipv4 (self, s):
# we prefer to use the ipaddress third-party module here, but fall back to a regex solution.
try:
import ipaddress
except:
if re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", s):
return True
else:
return False

# python 2/3 compat
try:
s = unicode(s)
except:
pass

# is instance of IPv4 address?
try:
return isinstance(ipaddress.ip_address(s), ipaddress.IPv4Address)
except:
return False


########################################################################################################################
def is_ipv6 (self, s):
# best effort pull in third-party module.
try:
import ipaddress
except:
return None

# python 2/3 compat
try:
s = unicode(s)
except:
pass

# is instance of IPv6 address?
try:
return isinstance(ipaddress.ip_address(s), ipaddress.IPv6Address)
except:
return False


####################################################################################################################
def is_domain (self, s):
return VALID_DOMAIN.match(s)

####################################################################################################################
def is_ip (self, s):
return self.is_ipv4(s) or self.is_ipv6(s)

####################################################################################################################
def lookup (self, kind, ioc):
"""
Expand Down Expand Up @@ -920,6 +987,86 @@ def repdb_sources (self):

return self.API("/repdb/sources")

####################################################################################################################
def report (self, ioc, timeout=None):
"""
Leverage multiprocessing to produce a single report for the supplied IP/domain indicator which includes data
from: lookup, DFIdb, REPdb, and IOCdb.

:type ioc: str
:param ioc: Indicator to lookup (IP, domain, URL)
:type timeout: integer
:param timeout: Maximum time given to producing the IOC report (default=60).

:rtype: dict
:return: API response.
"""

# default timeout.
if timeout is None:
timeout = 60

# parallelization.
jobs = []
mngr = multiprocessing.Manager()
resp = mngr.dict()

# what kind of IOC are we dealing with.
if self.is_ip(ioc):
kind = "ip"
elif self.is_domain(ioc):
kind = "domain"
elif ioc.startswith("http"):
kind = "url"
else:
raise inquestlabs_exception("could not determine indicator type for %s" % ioc)

# only IPs and domains get lookups.
if kind in ["ip", "domain"]:
job = multiprocessing.Process(target=worker_proxy, args=(self, "lookup", [kind, ioc], resp))
jobs.append(job)
job.start()

# all IOCs get compared against DFIdb, REPdb, and IOCdb
job = multiprocessing.Process(target=worker_proxy, args=(self, "dfi_search", ["ioc", kind, ioc], resp))
jobs.append(job)
job.start()

job = multiprocessing.Process(target=worker_proxy, args=(self, "repdb_search", [ioc], resp))
jobs.append(job)
job.start()

job = multiprocessing.Process(target=worker_proxy, args=(self, "iocdb_search", [ioc], resp))
jobs.append(job)
job.start()

# wait for jobs to complete.
self.__VERBOSE("waiting up to %d seconds for %d jobs to complete" % (timeout, len(jobs)))

# wait for jobs to complete, up to timeout
start = time.time()

while time.time() - start <= timeout:
if not any(job.is_alive() for job in jobs):
# all the processes are done, break now.
break

# this prevents CPU hogging.
time.sleep(1)

else:
self.__VERBOSE("timeout reached, killing jobs...")
for job in jobs:
job.terminate()
job.join()

elapsed = time.time() - start
self.__VERBOSE("completed all jobs in %d seconds" % elapsed)

# return the combined response.
return dict(resp)


####################################################################################################################
def stats (self):
"""
Expand Down Expand Up @@ -1277,6 +1424,10 @@ def main ():
else:
raise inquestlabs_exception("'lookup' supports 'ip' and 'domain'.")

### IP/DOMAIN/URL REPORT ###########################################################################################
elif args['report']:
print(json.dumps(labs.report(args['<ioc>'], args['--timeout'])))

### MISCELLANEOUS ##################################################################################################
elif args['stats']:
print(json.dumps(labs.stats()))
Expand Down