Skip to content

Commit

Permalink
Add Job filters (#247)
Browse files Browse the repository at this point in the history
* Add job filters

* fix docstrings.

* update docstring

* Get jobs filters working with new job manager

* Refactor out FilterJobObjects into new method

* Update YAPF

* remove missed confict markers

* Docstrings and renaming
  • Loading branch information
aarontp authored Dec 10, 2018
1 parent 05abb4f commit 4a1afaf
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 15 deletions.
11 changes: 8 additions & 3 deletions turbinia/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,16 @@ class TurbiniaServer(object):
task_manager (TaskManager): An object to manage turbinia tasks.
"""

def __init__(self):
"""Initialize Turbinia Server."""
def __init__(self, jobs_blacklist=None, jobs_whitelist=None):
"""Initializes Turbinia Server.
Args:
jobs_blacklist (Optional[list[str]]): Jobs we will exclude from running
jobs_whitelist (Optional[list[str]]): The only Jobs we will include to run
"""
config.LoadConfig()
self.task_manager = task_manager.get_task_manager()
self.task_manager.setup()
self.task_manager.setup(jobs_blacklist, jobs_whitelist)

def start(self):
"""Start Turbinia Server."""
Expand Down
53 changes: 53 additions & 0 deletions turbinia/jobs/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,65 @@

from __future__ import unicode_literals

from turbinia import TurbiniaException


class JobsManager(object):
"""The jobs manager."""

_job_classes = {}

@classmethod
def FilterJobNames(cls, job_names, jobs_blacklist=None, jobs_whitelist=None):
"""Filters a list of job names against white/black lists.
jobs_whitelist and jobs_blacklist must not be specified at the same time.
Args:
job_names (list[str]): The names of the job_names to filter.
jobs_blacklist (Optional[list[str]]): Job names to exclude.
jobs_whitelist (Optional[list[str]]): Job names to include.
Returns:
list[str]: Job names
Raises:
TurbiniaException if both jobs_blacklist and jobs_whitelist are specified.
"""
jobs_blacklist = jobs_blacklist if jobs_blacklist else []
jobs_blacklist = [job.lower() for job in jobs_blacklist]
jobs_whitelist = jobs_whitelist if jobs_whitelist else []
jobs_whitelist = [job.lower() for job in jobs_whitelist]

if jobs_whitelist and jobs_blacklist:
raise TurbiniaException(
'jobs_whitelist and jobs_blacklist cannot be specified at the same '
'time.')
elif jobs_blacklist:
return [job for job in job_names if job.lower() not in jobs_blacklist]
elif jobs_whitelist:
return [job for job in job_names if job.lower() in jobs_whitelist]
else:
return job_names

@classmethod
def FilterJobObjects(cls, jobs, jobs_blacklist=None, jobs_whitelist=None):
"""Filters a list of job objects against white/black lists.
jobs_whitelist and jobs_blacklist must not be specified at the same time.
Args:
jobs (list[TurbiniaJob]): The jobs to filter.
jobs_blacklist (Optional[list[str]]): Job names to exclude.
jobs_whitelist (Optional[list[str]]): Job names to include.
Returns:
list[TurbiniaJob]: Job objects
"""
job_names = [job.name.lower() for job in jobs]
job_names = cls.FilterJobNames(job_names, jobs_blacklist, jobs_whitelist)
return [job for job in jobs if job.name.lower() in job_names]

@classmethod
def DeregisterJob(cls, job_class):
"""Deregisters a job class.
Expand Down
50 changes: 50 additions & 0 deletions turbinia/jobs/manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import unittest

from turbinia import TurbiniaException
from turbinia.jobs import interface
from turbinia.jobs import manager

Expand Down Expand Up @@ -94,6 +95,55 @@ def testGetJobInstances(self):
for job in jobs:
self.assertIsInstance(job, interface.TurbiniaJob)

def testFilterJobNamesEmptyLists(self):
"""Test FilterJobNames() with no filters."""
job_names = ['testjob1', 'testjob2']
return_job_names = manager.JobsManager.FilterJobNames(
job_names, jobs_blacklist=[], jobs_whitelist=[])
self.assertListEqual(job_names, return_job_names)

def testFilterJobNamesBlackList(self):
"""Test FilterJobNames() with jobs_blacklist."""
job_names = ['testjob1', 'testjob2']
return_job_names = manager.JobsManager.FilterJobNames(
job_names, jobs_blacklist=[job_names[0]], jobs_whitelist=[])
self.assertListEqual(job_names[1:], return_job_names)

def testFilterJobObjectsBlackList(self):
"""Test FilterJobObjects() with jobs_blacklist and objects."""
jobs = [TestJob1(), TestJob2()]
return_jobs = manager.JobsManager.FilterJobObjects(
jobs, jobs_blacklist=[jobs[0].name], jobs_whitelist=[])
self.assertListEqual(jobs[1:], return_jobs)

def testFilterJobNamesWhiteList(self):
"""Test FilterJobNames() with jobs_whitelist."""
job_names = ['testjob1', 'testjob2']
return_job_names = manager.JobsManager.FilterJobNames(
job_names, jobs_blacklist=[], jobs_whitelist=[job_names[0]])
self.assertListEqual(job_names[:1], return_job_names)

def testFilterJobObjectsWhiteList(self):
"""Test FilterJobObjects() with jobs_whitelist."""
jobs = [TestJob1(), TestJob2()]
return_jobs = manager.JobsManager.FilterJobObjects(
jobs, jobs_blacklist=[], jobs_whitelist=[jobs[1].name])
self.assertListEqual(jobs[1:], return_jobs)

def testFilterJobNamesException(self):
"""Test FilterJobNames() with both jobs_blacklist and jobs_whitelist."""
job_names = ['testjob1', 'testjob2']
self.assertRaises(
TurbiniaException, manager.JobsManager.FilterJobNames, job_names,
jobs_blacklist=['a'], jobs_whitelist=['b'])

def testFilterJobNamesMixedCase(self):
"""Test FilterJobNames() with mixed case inputs."""
job_names = ['testjob1', 'testjob2']
return_job_names = manager.JobsManager.FilterJobNames(
job_names, jobs_blacklist=[], jobs_whitelist=['TESTJOB1'])
self.assertListEqual(job_names[:1], return_job_names)


if __name__ == '__main__':
unittest.main()
29 changes: 26 additions & 3 deletions turbinia/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,24 @@ def _backend_setup(self, *args, **kwargs):
"""
raise NotImplementedError

def setup(self, *args, **kwargs):
"""Does setup of Task manager and its dependencies."""
def setup(self, jobs_blacklist=None, jobs_whitelist=None, *args, **kwargs):
"""Does setup of Task manager and its dependencies.
Args:
jobs_blacklist (list): Jobs that will be excluded from running
jobs_whitelist (list): The only Jobs will be included to run
"""
self._backend_setup(*args, **kwargs)
# TODO(aarontp): Consider instantiating a job per evidence object
job_names = jobs_manager.JobsManager.GetJobNames()
if jobs_blacklist or jobs_whitelist:
log.info(
'Filtering Jobs with whitelist {0!s} and blacklist {1!s}'.format(
jobs_whitelist, jobs_blacklist))
job_names = jobs_manager.JobsManager.FilterJobNames(
job_names, jobs_blacklist, jobs_whitelist)
self.jobs = jobs_manager.JobsManager.GetJobInstances(job_names)
log.debug('Registered job list: {0:s}'.format(str(job_names)))

def add_evidence(self, evidence_):
"""Adds new evidence and creates tasks to process it.
Expand All @@ -125,11 +137,22 @@ def add_evidence(self, evidence_):
log.info('Adding new evidence: {0:s}'.format(str(evidence_)))
self.evidence.append(evidence_)
job_count = 0
jobs_whitelist = evidence_.config.get('jobs_whitelist', [])
jobs_blacklist = evidence_.config.get('jobs_blacklist', [])
if jobs_blacklist or jobs_whitelist:
log.info(
'Filtering Jobs with whitelist {0!s} and blacklist {1!s}'.format(
jobs_whitelist, jobs_blacklist))
jobs_list = jobs_manager.JobsManager.FilterJobObjects(
self.jobs, jobs_blacklist, jobs_whitelist)
else:
jobs_list = self.jobs

# TODO(aarontp): Add some kind of loop detection in here so that jobs can
# register for Evidence(), or or other evidence types that may be a super
# class of the output of the job itself. Short term we could potentially
# have a run time check for this upon Job instantiation to prevent it.
for job in self.jobs:
for job in jobs_list:
# Doing a strict type check here for now until we can get the above
# comment figured out.
# pylint: disable=unidiomatic-typecheck
Expand Down
43 changes: 34 additions & 9 deletions turbinia/turbiniactl.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@
logger.setup()


def csv_list(string):
"""Helper method for having CSV argparse types.
Args:
string(str): Comma separated string to parse.
Returns:
list[str]: The parsed strings.
"""
return string.split(',')


def main():
"""Main function for turbiniactl"""
# TODO(aarontp): Allow for single run mode when
Expand Down Expand Up @@ -89,12 +101,17 @@ def main():
'text based evidence files with (in extended grep regex format). '
'This filtered output will be in addition to the complete output')
parser.add_argument(
'-j', '--jobs_whitelist',
help='A whitelist for Jobs that we will allow to run (note that it '
'will not force them to run).')
'-j', '--jobs_whitelist', default=[], type=csv_list,
help='A whitelist for Jobs that will be allowed to run (in CSV format, '
'no spaces). This will not force them to run if they are not configured '
'to. This is applied both at server start time and when the client makes '
'a processing request. When applied at server start time the change is '
'persistent while the server is running. When applied by the client, it '
'will only affect that processing request.')
parser.add_argument(
'-J', '--jobs_blacklist',
help='A blacklist for Jobs we will not allow to run')
'-J', '--jobs_blacklist', default=[], type=csv_list,
help='A blacklist for Jobs we will not allow to run. See '
'--jobs_whitelist help for details on format and when it is applied.')
parser.add_argument(
'-p', '--poll_interval', default=60, type=int,
help='Number of seconds to wait between polling for task state info')
Expand Down Expand Up @@ -189,7 +206,10 @@ def main():
'-n', '--name', help='Descriptive name of the evidence', required=False)

# List Jobs
subparsers.add_parser('listjobs', help='List all available jobs')
subparsers.add_parser(
'listjobs',
help='List all available Jobs. These Job names can be used by '
'--jobs_whitelist and --jobs_blacklist')

# PSQ Worker
parser_psqworker = subparsers.add_parser('psqworker', help='Run PSQ worker')
Expand Down Expand Up @@ -238,8 +258,8 @@ def main():

if args.jobs_whitelist and args.jobs_blacklist:
log.error(
'A Job filter whitelist and blacklist cannot be specified '
'at the same time')
'A Job filter whitelist and blacklist cannot be specified at the same '
'time')
sys.exit(1)

filter_patterns = None
Expand Down Expand Up @@ -320,7 +340,8 @@ def main():
worker = TurbiniaCeleryWorker()
worker.start()
elif args.command == 'server':
server = TurbiniaServer()
server = TurbiniaServer(
jobs_blacklist=args.jobs_blacklist, jobs_whitelist=args.jobs_whitelist)
server.start()
elif args.command == 'status':
region = config.TURBINIA_REGION
Expand Down Expand Up @@ -401,6 +422,10 @@ def main():
request.evidence.append(evidence_)
if filter_patterns:
request.recipe['filter_patterns'] = filter_patterns
if args.jobs_blacklist:
request.recipe['jobs_blacklist'] = args.jobs_blacklist
if args.jobs_whitelist:
request.recipe['jobs_whitelist'] = args.jobs_whitelist
if args.dump_json:
print(request.to_json().encode('utf-8'))
else:
Expand Down

0 comments on commit 4a1afaf

Please sign in to comment.