Skip to content

Commit

Permalink
Add: version ranges
Browse files Browse the repository at this point in the history
Now it is possible to set a range for versions within a package is vulnerable
  • Loading branch information
Kraemii committed Jul 27, 2023
1 parent fba0a4d commit b49750b
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 41 deletions.
94 changes: 76 additions & 18 deletions notus/scanner/loader/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@

from ..errors import AdvisoriesLoadingError
from ..models.packages import package_class_by_type
from ..models.packages.package import PackageAdvisories, PackageType
from ..models.packages.package import (
Package,
PackageAdvisories,
PackageRange,
PackageType,
)
from .gpg_sha_verifier import VerificationResult
from .loader import AdvisoriesLoader

Expand Down Expand Up @@ -75,6 +80,32 @@ def __load_data(self, operating_system: str) -> Optional[Dict]:
"while decoding JSON data."
) from None

@staticmethod
def _get_package_from_package_dict(
package_class: Package,
package_dict: Dict[str, str],
operating_system: str,
) -> Optional[Package]:
full_name = package_dict.get("full_name")
if full_name:
package = package_class.from_full_name(full_name)
else:
package = package_class.from_name_and_full_version(
package_dict.get("name"),
package_dict.get("full_version"),
)
if not package:
logger.warning(
(
"could not parse fixed package information from"
" %s in %s: version range must contain exactly"
" 2 entries"
),
package_dict,
operating_system,
)
return package

def load_package_advisories(
self, operating_system: str
) -> Optional[PackageAdvisories]:
Expand Down Expand Up @@ -114,25 +145,52 @@ def load_package_advisories(
advisory = oid

for package_dict in fixed_packages:
full_name = package_dict.get("full_name")
if full_name:
package = package_class.from_full_name(full_name)
else:
package = package_class.from_name_and_full_version(
package_dict.get("name"),
package_dict.get("full_version"),
version_range = package_dict.get("range")
if version_range:
if len(version_range) != 2:
logger.warning(
(
"could not parse fixed package information from"
" %s in %s: version range must contain exactly"
" 2 entries"
),
package_dict,
operating_system,
)
continue
package_dict1 = version_range[0]
package_dict2 = version_range[1]

package1 = self._get_package_from_package_dict(
package_class, package_dict1, operating_system
)
if not package1:
continue
package2 = self._get_package_from_package_dict(
package_class, package_dict2, operating_system
)
if not package:
logger.warning(
"Could not parse fixed package information from %s "
"in %s",
package_dict,
operating_system,
if not package2:
continue
package_range = PackageRange(
name=package1.name,
verifier1=package_dict1.get("specifier"),
verifier2=package_dict2.get("specifier"),
package1=package1,
package2=package2,
)
continue
package_advisories.add_range_advisory_for_package(
package_range, advisory
)
else:
package = self._get_package_from_package_dict(
package_class, package_dict, operating_system
)
if not package:
logger.warning("No Package")

package_advisories.add_advisory_for_package(
package, advisory, package_dict.get("specifier")
)
continue
package_advisories.add_advisory_for_package(
package, advisory, package_dict.get("specifier")
)

return package_advisories
66 changes: 57 additions & 9 deletions notus/scanner/models/packages/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,15 @@ def from_name_and_full_version(name: str, full_version: str):
raise NotImplementedError()


@dataclass
class PackageRange:
name: str
verifier1: str
verifier2: str
package1: Package
package2: Package


@dataclass(frozen=True, unsafe_hash=True)
class PackageAdvisory:
"""Connects a package with an advisory"""
Expand All @@ -173,9 +182,9 @@ class PackageAdvisories:
default_factory=dict
)

is_comparable = (
lambda a, b: a.compare(b) != PackageComparison.NOT_COMPARABLE
)
@staticmethod
def is_comparable(package1: Package, package2: Package):
return package1.compare(package2) != PackageComparison.NOT_COMPARABLE

comparison_map = {
">=": lambda a, b: a > b
Expand All @@ -196,9 +205,9 @@ class PackageAdvisories:
}

def get_package_advisories_for_package(
self, package: Package
self, package_name: str
) -> Dict[str, Set[PackageAdvisory]]:
return self.advisories.get(package.name) or dict()
return self.advisories.get(package_name) or dict()

def add_advisory_for_package(
self,
Expand All @@ -208,10 +217,10 @@ def add_advisory_for_package(
) -> None:
if verifier not in self.comparison_map:
verifier = ">="
advisories = self.get_package_advisories_for_package(package)
is_vulnerable = lambda other: self.comparison_map[verifier](
package, other
)
advisories = self.get_package_advisories_for_package(package.name)

def is_vulnerable(other: Package) -> Optional[bool]:
return self.comparison_map[verifier](package, other)

if not advisory in advisories:
advisories[advisory] = set()
Expand All @@ -221,5 +230,44 @@ def add_advisory_for_package(
)
self.advisories[package.name] = advisories

def add_range_advisory_for_package(
self,
package_range: PackageRange,
advisory: str,
) -> None:
advisories = self.get_package_advisories_for_package(package_range.name)
if package_range.verifier1 in self.comparison_map:
package_range.verifier1 = "<"
if package_range.verifier2 in self.comparison_map:
package_range.verifier2 = ">="

def is_vulnerable(other: Package) -> Optional[bool]:
return self.comparison_map[package_range.verifier1](
package_range.package1, other
) and self.comparison_map[package_range.verifier2](
package_range.package2, other
)

if not advisory in advisories:
advisories[advisory] = set()

advisories[advisory].add(
PackageAdvisory(
package_range.package1,
advisory,
package_range.verifier1,
is_vulnerable,
)
)
advisories[advisory].add(
PackageAdvisory(
package_range.package2,
advisory,
package_range.verifier2,
is_vulnerable,
)
)
self.advisories[package_range.name] = advisories

def __len__(self) -> int:
return len(self.advisories)
22 changes: 13 additions & 9 deletions notus/scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ def _check_package(
package_advisory.package,
)
is_vulnerable = package_advisory.is_vulnerable(package)
if is_vulnerable is None:
if not is_vulnerable:
continue
elif not is_vulnerable:
return

vul.add(package, package_advisory)

Expand All @@ -123,7 +121,9 @@ def _start_scan(

for package in installed_packages:
package_advisory_oids = (
package_advisories.get_package_advisories_for_package(package)
package_advisories.get_package_advisories_for_package(
package.name
)
)
for oid, package_advisory_list in package_advisory_oids.items():
vul = self._check_package(package, package_advisory_list)
Expand Down Expand Up @@ -171,9 +171,11 @@ def run_scan(
if not package_advisories:
# Probably a wrong or not supported OS-release
logger.error(
"Unable to start scan for %s: No advisories for OS-release %s"
" found. Check if the OS-release is correct and the"
" corresponding advisories are given.",
(
"Unable to start scan for %s: No advisories for OS-release"
" %s found. Check if the OS-release is correct and the"
" corresponding advisories are given."
),
message.host_ip,
message.os_release,
)
Expand All @@ -189,8 +191,10 @@ def run_scan(
package_class = package_class_by_type(package_type)
if not package_class:
logger.error(
"Unable to start scan for %s: No package implementation for "
"OS-release %s found. Check if the OS-release is correct.",
(
"Unable to start scan for %s: No package implementation for"
" OS-release %s found. Check if the OS-release is correct."
),
message.host_ip,
message.os_release,
)
Expand Down
Loading

0 comments on commit b49750b

Please sign in to comment.