Skip to content

Commit

Permalink
first draft to implement new IAVA support being added to Data API
Browse files Browse the repository at this point in the history
  • Loading branch information
ryran committed Nov 27, 2016
1 parent 1d059dd commit 298e775
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 101 deletions.
308 changes: 233 additions & 75 deletions rhsda.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def _get_terminal_width(self):
return w

def __validate_data_type(self, dT):
dataTypes = ['cvrf', 'cve', 'oval']
dataTypes = ['cvrf', 'cve', 'oval', 'iava']
if dT not in dataTypes:
raise ValueError("Invalid data type ('{0}') requested; should be one of: {1}".format(dT, ", ".join(dataTypes)))

Expand Down Expand Up @@ -243,13 +243,13 @@ def find_cvrfs(self, params=None, outFormat='json',
page=None, per_page=None):
"""Find CVRF documents by recent or attributes.
Provides an index to recent CVRF documents with a summary of their contents,
when no parameters are passed. Returns a convenience object as the response with
minimal attributes.
Provides an index to recent CVRF documents when no parameters are passed.
Each list item is a convenience object with minimal attributes.
Use parameters to narrow down results.
With *outFormat* of "json", returns JSON object.
With *outFormat* of "xml", returns unformatted XML as string.
If *params* dict is passed, additional parameters are ignored.
If *params* dict is passed, additional parameters are ignored.
"""
if not params:
params = {
Expand All @@ -270,12 +270,13 @@ def find_cves(self, params=None, outFormat='json',
page=None, per_page=None):
"""Find CVEs by recent or attributes.
Provides an index to recent CVEs when no parameters are passed. Returns a
convenience object as response with minimal attributes.
Provides an index to recent CVEs when no parameters are passed.
Each list item is a convenience object with minimal attributes.
Use parameters to narrow down results.
With *outFormat* of "json", returns JSON object.
With *outFormat* of "xml", returns unformatted XML as string.
If *params* dict is passed, additional parameters are ignored.
If *params* dict is passed, additional parameters are ignored.
"""
if not params:
params = {
Expand All @@ -298,13 +299,13 @@ def find_ovals(self, params=None, outFormat='json',
page=None, per_page=None):
"""Find OVAL definitions by recent or attributes.
Provides an index to recent OVAL definitions with a summary of their contents,
when no parameters are passed. Returns a convenience object as the response with
minimal attributes.
Provides an index to recent OVAL definitions when no parameters are passed.
Each list item is a convenience object with minimal attributes.
Use parameters to narrow down results.
With *outFormat* of "json", returns JSON object.
With *outFormat* of "xml", returns unformatted XML as string.
If *params* dict is passed, additional parameters are ignored.
If *params* dict is passed, additional parameters are ignored.
"""
if not params:
params = {
Expand All @@ -318,6 +319,28 @@ def find_ovals(self, params=None, outFormat='json',
}
return self._find('oval', params, outFormat)

def find_iavas(self, params=None, outFormat='json',
number=None, severity=None,
page=None, per_page=None):
"""Find IAVA notices by recent or attributes.
Provides an index to recent IAVA notices when no parameters are passed.
Each list item is a convenience object with minimal attributes.
Use parameters to narrow down results.
With *outFormat* of "json", returns JSON object.
With *outFormat* of "xml", returns unformatted XML as string.
If *params* dict is passed, additional parameters are ignored.
"""
if not params:
params = {
'number': number,
'severity': severity,
'page': page,
'per_page': per_page,
}
return self._find('iava', params, outFormat)

def get_cvrf(self, rhsa, outFormat='json'):
"""Retrieve CVRF details for an RHSA."""
return self._retrieve('cvrf', rhsa, outFormat)
Expand All @@ -334,6 +357,10 @@ def get_oval(self, rhsa, outFormat='json'):
"""Retrieve OVAL details for an RHSA."""
return self._retrieve('oval', rhsa, outFormat)

def get_iava(self, iava, outFormat='json'):
"""Retrieve notice details for an IAVA."""
return self._retrieve('iava', iava, outFormat)

def __stripjoin(self, input, oneLineEach=False):
"""Strip whitespace from input or input list."""
text = ""
Expand Down Expand Up @@ -366,7 +393,7 @@ def _parse_cve_to_plaintext(self, cve):
This is designed with only one argument in order to allow being used as a worker
with multiprocessing.Pool.map_async().
Various printing operations in this method are conditional upon (or are tweaked
by) the values in the self.cfg namespace as set in parent meth self.mget_cves().
"""
Expand Down Expand Up @@ -524,6 +551,57 @@ def _parse_cve_to_plaintext(self, cve):
out.append("")
return True, "\n".join(out)

def _parse_iava_to_plaintext(self, iava):
"""Generate a plaintext representation of an IAVA.
This is designed with only one argument in order to allow being used as a worker
with multiprocessing.Pool.map_async().
Various printing operations in this method are conditional upon (or are tweaked
by) the values in the self.cfg namespace as set in parent meth self.mget_iavas().
"""
# Output array:
out = []
try:
# Store json
J = self.get_iava(iava)
except requests.exceptions.HTTPError as e:
# IAVA not in RH IAVA DB
logger.info(e)
if self.cfg.onlyCount or self.cfg.outFormat in ['list', 'json', 'jsonpretty']:
return False, ""
else:
out.append("{0}\n Not present in Red Hat IAVA database".format(iava))
out.append("")
return False, "\n".join(out)
# If json output requested
if self.cfg.outFormat.startswith('json'):
return True, J
# If CVE list output requested
if self.cfg.outFormat == 'list':
cves = J['cvelist']
if cves:
return True, cves
else:
return None, []
# IAVA ID
out.append(iava)
# SEVERITY
out.append(" SEVERITY: {0}".format(J['severity']))
# TITLE
out.append(" TITLE: {0}".format(self.__stripjoin(J['title'])))
# CVELIST
if J['cvelist']:
out.append(" CVES:")
for cve in J['cvelist']:
out.append(" {0}".format(cve))
# Return no output if only counting
if self.cfg.onlyCount:
return True, ""
# Add one final newline to the end
out.append("")
return True, "\n".join(out)

def _set_cve_plaintext_fields(self, desiredFields):
logger.debug("Requested fields string: '{0}'".format(desiredFields))
if not desiredFields:
Expand Down Expand Up @@ -591,7 +669,7 @@ def mget_cves(self, cves, numThreads=0, onlyCount=False, outFormat='plaintext',
urls=False, fields='ALL', wrapWidth=70, product=None, timeout=300):
"""Use multi-threading to lookup a list of CVEs and return text output.
*cves*: A list of CVE ids or a str obj from which to regex CVE ids
*cves*: A list of CVE ids or a str/file obj from which to regex CVE ids
*numThreads*: Number of concurrent worker threads; 0 == CPUs*2
*onlyCount*: Whether to exit after simply logging number of valid/invalid CVEs
*outFormat*: Control output format ("plaintext", "json", or "jsonpretty")
Expand Down Expand Up @@ -713,6 +791,87 @@ def mget_cves(self, cves, numThreads=0, onlyCount=False, outFormat='plaintext',
elif outFormat == 'jsonpretty':
return jprint(cveOutput)

def mget_iavas(self, iavas, numThreads=0, onlyCount=False, outFormat='plaintext',
wrapWidth=70, timeout=300):
"""Use multi-threading to lookup a list of IAVAs and return text output.
*iavas*: A list of IAVA ids
*numThreads*: Number of concurrent worker threads; 0 == CPUs*2
*onlyCount*: Whether to exit after simply logging number of valid/invalid CVEs
*outFormat*: Control output format ("list", "plaintext", "json", or "jsonpretty")
*wrapWidth*: Width for long fields; 1 auto-detects based on terminal size
*timeout*: Total ammount of time to wait for all CVEs to be retrieved
ON *OUTFORMAT*:
Setting to "list" returns list object containing ONLY CVE ids.
Setting to "plaintext" returns str object containing formatted output.
Setting to "json" returns list object (i.e., original JSON)
Setting to "jsonpretty" returns str object containing prettified JSON
"""
if outFormat not in ['list', 'plaintext', 'json', 'jsonpretty']:
raise ValueError("Invalid outFormat ('{0}') requested; should be one of: 'list', 'plaintext', 'json', 'jsonpretty'".format(outFormat))
if not isinstance(iavas, list):
raise ValueError("Invalid 'iavas=' argument input; must be list obj")
# Configure threads
if not numThreads:
numThreads = numThreadsDefault
# Lower threads for small work-loads
if numThreads > len(iavas):
numThreads = len(iavas)
logger.info("Using {0} worker threads".format(numThreads))
# Set cfg directives for our worker
self.cfg.onlyCount = onlyCount
self.cfg.outFormat = outFormat
self._set_cve_plaintext_width(wrapWidth)
# Disable sigint before starting process pool
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = multiprocessing.Pool(processes=numThreads)
# Re-enable receipt of sigint
signal.signal(signal.SIGINT, original_sigint_handler)
# Allow cancelling with Ctrl-c
try:
p = pool.map_async(self._parse_iava_to_plaintext, iavas)
# Need to specify timeout; see: http://stackoverflow.com/a/35134329
results = p.get(timeout=timeout)
except KeyboardInterrupt:
logger.error("Received KeyboardInterrupt; terminating worker threads")
pool.terminate()
raise
else:
pool.close()
pool.join()
successValues, iavaOutput = zip(*results)
n_total = len(iavas)
n_hidden = successValues.count(None)
n_valid = successValues.count(True)
n_invalid = successValues.count(False)
logger.log(25, "Valid Red Hat IAVA results retrieved: {0} of {1}".format(n_valid + n_hidden, n_total))
if n_invalid:
logger.log(25, "Invalid IAVA queries: {0} of {1}".format(n_invalid, n_total))
if outFormat == 'list':
cves = []
for i in iavaOutput:
cves.extend(i)
logger.log(25, "Number of CVEs mapped from retrieved IAVAs: {0}".format(len(cves)))
if onlyCount:
return
if outFormat == 'list':
return cves
if outFormat == 'plaintext':
# Remove all blank entries (created when spotlight-product hides a CVE)
iavaOutput = list(iavaOutput)
while 1:
try:
iavaOutput.remove("")
except ValueError:
break
return "\n".join(iavaOutput)
elif outFormat == 'json':
return iavaOutput
elif outFormat == 'jsonpretty':
return jprint(iavaOutput)

def cve_search_query(self, params, outFormat='list'):
"""Perform a CVE search query.
Expand Down Expand Up @@ -745,67 +904,66 @@ def _err_print_support_urls(self, msg=None):
print("\nFor help, open an issue at http://github.com/ryran/rhsecapi\n"
"Or post a comment at https://access.redhat.com/discussions/2713931", file=sys.stderr)

def _iavm_query(self, url):
"""Get IAVA json from IAVM Mapper App."""
logger.info("Getting {0}".format(url))
try:
r = requests.get(url, auth=())
except requests.exceptions.ConnectionError as e:
self._err_print_support_urls(e)
raise
except requests.exceptions.RequestException as e:
self._err_print_support_urls(e)
raise
except requests.exceptions.HTTPError as e:
self._err_print_support_urls(e)
raise
r.raise_for_status()
baseurl = r.url.split("/")[-1]
if not baseurl:
baseurl = r.url.split("/")[-2]
logger.debug("Return '.../{0}': Status {1}, Content-Type {2}".format(baseurl, r.status_code, r.headers['Content-Type'].split(";")[0]))
if 'application/json' in r.headers['Content-Type']:
result = r.json()
elif '<title>Login - Red Hat Customer Portal</title>' in r.content:
logger.error("Login error")
print("\nIAVA→CVE mapping data is not provided by the public RH Security Data API.\n"
"Instead, this uses the IAVM Mapper App (access.redhat.com/labs/iavmmapper).\n\n"
"Access to this data requires RH Customer Portal credentials be provided.\n"
"Create a ~/.netrc with the following contents:\n\n"
"machine access.redhat.com\n"
" login YOUR-CUSTOMER-PORTAL-LOGIN\n"
" password YOUR_PASSWORD_HERE",
file=sys.stderr)
self._err_print_support_urls()
return []
return result


def get_iava(self, iavaId):
"""Validate IAVA number and return json."""
# Get main IAVA master index
url = 'https://access.redhat.com/labs/iavmmapper/api/iava/'
result = self._iavm_query(url)
if not result:
# If no result, we're not logged in & error has already been logged
return []
if iavaId in result:
logger.debug("IAVM Mapper app main index contains '{0}'".format(iavaId))
else:
logger.error("IAVM Mapper app main index doesn't contain '{0}'".format(iavaId))
self._err_print_support_urls()
return []
# Get specific IAVA now
url += '{0}'.format(iavaId)
try:
result = self._iavm_query(url)
except requests.exceptions.HTTPError as e:
logger.info(e)
logger.error("IAVM Mapper app doesn't have entry for '{0}'".format(iavaId))
self._err_print_support_urls()
return []
logger.log(25, "{0} CVEs found with search".format(len(result['IAVM']['CVEs']['CVENumber'])))
return result
# def _iavm_query(self, url):
# """Get IAVA json from IAVM Mapper App."""
# logger.info("Getting {0}".format(url))
# try:
# r = requests.get(url, auth=())
# except requests.exceptions.ConnectionError as e:
# self._err_print_support_urls(e)
# raise
# except requests.exceptions.RequestException as e:
# self._err_print_support_urls(e)
# raise
# except requests.exceptions.HTTPError as e:
# self._err_print_support_urls(e)
# raise
# r.raise_for_status()
# baseurl = r.url.split("/")[-1]
# if not baseurl:
# baseurl = r.url.split("/")[-2]
# logger.debug("Return '.../{0}': Status {1}, Content-Type {2}".format(baseurl, r.status_code, r.headers['Content-Type'].split(";")[0]))
# if 'application/json' in r.headers['Content-Type']:
# result = r.json()
# elif '<title>Login - Red Hat Customer Portal</title>' in r.content:
# logger.error("Login error")
# print("\nIAVA→CVE mapping data is not provided by the public RH Security Data API.\n"
# "Instead, this uses the IAVM Mapper App (access.redhat.com/labs/iavmmapper).\n\n"
# "Access to this data requires RH Customer Portal credentials be provided.\n"
# "Create a ~/.netrc with the following contents:\n\n"
# "machine access.redhat.com\n"
# " login YOUR-CUSTOMER-PORTAL-LOGIN\n"
# " password YOUR_PASSWORD_HERE",
# file=sys.stderr)
# self._err_print_support_urls()
# return []
# return result

# def get_iava(self, iavaId):
# """Validate IAVA number and return json."""
# # Get main IAVA master index
# url = 'https://access.redhat.com/labs/iavmmapper/api/iava/'
# result = self._iavm_query(url)
# if not result:
# # If no result, we're not logged in & error has already been logged
# return []
# if iavaId in result:
# logger.debug("IAVM Mapper app main index contains '{0}'".format(iavaId))
# else:
# logger.error("IAVM Mapper app main index doesn't contain '{0}'".format(iavaId))
# self._err_print_support_urls()
# return []
# # Get specific IAVA now
# url += '{0}'.format(iavaId)
# try:
# result = self._iavm_query(url)
# except requests.exceptions.HTTPError as e:
# logger.info(e)
# logger.error("IAVM Mapper app doesn't have entry for '{0}'".format(iavaId))
# self._err_print_support_urls()
# return []
# logger.log(25, "{0} CVEs found with search".format(len(result['IAVM']['CVEs']['CVENumber'])))
# return result


if __name__ == "__main__":
Expand Down
Loading

2 comments on commit 298e775

@ryran
Copy link
Collaborator Author

@ryran ryran commented on 298e775 Nov 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#51

@ryran
Copy link
Collaborator Author

@ryran ryran commented on 298e775 Nov 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this with half of my attention while watching Guardians of the Galaxy with family so.... yeah it needs work.

Please sign in to comment.