Skip to content

Commit

Permalink
Add unit tests for leftover packages actors and move them to common repo
Browse files Browse the repository at this point in the history
Actors:
    * check_leftover_packages
    * remove_leftover_packages
    * report_leftover_packages
Changes:
    * Move actors code from el7toel8 to common
    * Refactor some actors
    * Put their processes into library
    * Create unit tests for actors

Jira: OAMG-1254
  • Loading branch information
tomasfratrik committed Aug 15, 2024
1 parent 16fb443 commit f2bd7b4
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 164 deletions.
21 changes: 21 additions & 0 deletions repos/system_upgrade/common/actors/checkleftoverpackages/actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from leapp.actors import Actor
from leapp.libraries.actor import checkleftoverpackages
from leapp.models import InstalledUnsignedRPM, LeftoverPackages, TransactionCompleted
from leapp.tags import IPUWorkflowTag, RPMUpgradePhaseTag


class CheckLeftoverPackages(Actor):
"""
Check if there are any left over packages from older RHEL version present after upgrade.
Actor produces message containing these packages.
Message is empty if there are no packages from older system left.
"""

name = 'check_leftover_packages'
consumes = (TransactionCompleted, InstalledUnsignedRPM)
produces = (LeftoverPackages,)
tags = (RPMUpgradePhaseTag, IPUWorkflowTag)

def process(self):
checkleftoverpackages.process()
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import re

from leapp.libraries.common.config.version import get_source_major_version
from leapp.libraries.common.rpms import get_installed_rpms, get_leapp_dep_packages, get_leapp_packages
from leapp.libraries.stdlib import api
from leapp.models import InstalledUnsignedRPM, LeftoverPackages, RPM


def process():
LEAPP_PACKAGES = get_leapp_packages(major_version=[get_source_major_version()])
LEAPP_DEP_PACKAGES = get_leapp_dep_packages(major_version=[get_source_major_version()])
installed_rpms = get_installed_rpms()

if not installed_rpms:
return

leftover_pkgs_to_remove = LeftoverPackages()
unsigned = [pkg.name for pkg in next(api.consume(InstalledUnsignedRPM), InstalledUnsignedRPM()).items]

for rpm in installed_rpms:
rpm = rpm.strip()
if not rpm:
continue
try:
name, version, release, epoch, packager, arch, pgpsig = rpm.split('|')
except ValueError:
api.current_logger().warning('Could not parse rpm: {}'.format(rpm))
continue

version_pattern = r'el(\d+)'
match = re.search(version_pattern, release)

if match:
major_version = match.group(1)
PKGS_NOT_TO_BE_DELETED = set(LEAPP_PACKAGES + LEAPP_DEP_PACKAGES + unsigned)
if int(major_version) <= int(get_source_major_version()) and name not in PKGS_NOT_TO_BE_DELETED:
leftover_pkgs_to_remove.items.append(RPM(
name=name,
version=version,
epoch=epoch,
packager=packager,
arch=arch,
release=release,
pgpsig=pgpsig
))

api.produce(leftover_pkgs_to_remove)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest

from leapp.libraries.actor import checkleftoverpackages
from leapp.libraries.common.testutils import CurrentActorMocked, produce_mocked
from leapp.libraries.stdlib import api
from leapp.models import InstalledUnsignedRPM, LeftoverPackages, RPM


@pytest.mark.parametrize(
('source_major_version', 'rpm_name', 'release', 'expected_to_be_removed'),
(
(7, 'sed', '7.el7', True),
(8, 'sed', '8.el7', True),
(7, 'gnutls', '8.el8_9.1', False),
(7, 'unsigned', '1.el7', False),
(7, 'leapp', '1.el7', False),
(8, 'leapp-upgrade-el8toel9', '1.el8', False),
(8, 'leapp-upgrade-el8toel9-deps', '1.el8', False),
)
)
def test_package_to_be_removed(monkeypatch, source_major_version, rpm_name, release, expected_to_be_removed):
rpm_details = {
'version': '0.1',
'epoch': '0',
'packager': 'packager',
'arch': 'noarch',
'pgpsig': 'sig'
}

def get_installed_rpms_mocked():
return ['{name}|{version}|{release}|{epoch}|{packager}|{arch}|{pgpsig}'.format(
name=rpm_name,
version=rpm_details['version'],
release=release,
epoch=rpm_details['epoch'],


packager=rpm_details['packager'],
arch=rpm_details['arch'],
pgpsig=rpm_details['pgpsig']
)]

UnsignedRPM = RPM(name='unsigned', version='0.1', release=release, epoch='0',
packager='packager', arch='noarch', pgpsig='OTHER_SIG')

monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=[InstalledUnsignedRPM(items=[UnsignedRPM])]))
monkeypatch.setattr(checkleftoverpackages, 'get_installed_rpms', get_installed_rpms_mocked)
monkeypatch.setattr(checkleftoverpackages, 'get_source_major_version', lambda: str(source_major_version))
monkeypatch.setattr(api, 'produce', produce_mocked())

checkleftoverpackages.process()

expected_output = LeftoverPackages()
if expected_to_be_removed:
expected_output.items.append(RPM(name=rpm_name, release=release, **rpm_details))

assert api.produce.called == 1
assert len(api.produce.model_instances) == 1
assert api.produce.model_instances[0] == expected_output
22 changes: 22 additions & 0 deletions repos/system_upgrade/common/actors/removeleftoverpackages/actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from leapp.actors import Actor
from leapp.libraries.actor import removeleftoverpackages
from leapp.models import LeftoverPackages, RemovedPackages
from leapp.tags import IPUWorkflowTag, RPMUpgradePhaseTag


class RemoveLeftoverPackages(Actor):
"""
Remove packages left on the system after the upgrade to higher major version of RHEL.
Removal of packages is necessary in order to keep the machine in supported state.
Produce a message that is consumed by the `reportleftoverpackages` actor,
which reports on the packages that have been removed.
"""

name = 'remove_leftover_packages'
consumes = (LeftoverPackages, )
produces = (RemovedPackages, )
tags = (RPMUpgradePhaseTag, IPUWorkflowTag, )

def process(self):
removeleftoverpackages.process()
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from leapp.libraries.common.rhsm import skip_rhsm
from leapp.libraries.common.rpms import get_installed_rpms
from leapp.libraries.stdlib import api, CalledProcessError, run
from leapp.models import LeftoverPackages, RemovedPackages, RPM


def _get_leftover_packages():
"""
Consume and return LefoverPackages, if there are none, return None
"""
leftover_packages = next(api.consume(LeftoverPackages), LeftoverPackages())
if not leftover_packages.items:
api.current_logger().info('No leftover packages, skipping...')
return None
return leftover_packages


def _get_removed_packages(installed_rpms):
"""
Create RemovedPackages message with the list of removed packages
"""
removed_packages = []
removed = list(set(installed_rpms) - set(get_installed_rpms()))

for pkg in removed:
try:
name, version, release, epoch, packager, arch, pgpsig = pkg.split('|')
except ValueError:
api.current_logger().warning('Could not parse rpm: {}'.format(pkg))
continue
removed_packages.append(RPM(
name=name,
version=version,
epoch=epoch,
packager=packager,
arch=arch,
release=release,
pgpsig=pgpsig
))
return RemovedPackages(items=removed_packages)


def process():
leftover_packages = _get_leftover_packages()
if leftover_packages is None:
return

installed_rpms = get_installed_rpms()

leftover_pkgs_to_remove = [
'{name}-{version}-{release}'.format(
name=pkg.name,
version=pkg.version,
release=pkg.release
)
for pkg in leftover_packages.items
]

cmd = ['dnf', 'remove', '-y', '--noautoremove'] + leftover_pkgs_to_remove
if skip_rhsm():
# ensure we don't use suscription-manager when it should be skipped
cmd += ['--disableplugin', 'subscription-manager']
try:
run(cmd)
except (CalledProcessError, OSError):
error = 'Failed to remove packages: {}'.format(', '.join(leftover_pkgs_to_remove))
api.current_logger().error(error)
return

api.produce(_get_removed_packages(installed_rpms))
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import pytest

from leapp.libraries.actor import removeleftoverpackages
from leapp.libraries.common.testutils import CurrentActorMocked, logger_mocked, produce_mocked
from leapp.libraries.stdlib import api, CalledProcessError
from leapp.models import LeftoverPackages, RemovedPackages, RPM


def test_get_leftover_packages(monkeypatch):
rpm = RPM(name='rpm', version='1.0', release='1.el7', epoch='0', packager='foo', arch='noarch', pgpsig='SIG')
monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=[LeftoverPackages(items=[rpm])]))
monkeypatch.setattr(api, 'current_logger', logger_mocked())

assert removeleftoverpackages._get_leftover_packages() == LeftoverPackages(items=[rpm])


def test_no_leftover_packages(monkeypatch):
monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=[LeftoverPackages()]))
monkeypatch.setattr(api, 'current_logger', logger_mocked())

packages = removeleftoverpackages._get_leftover_packages()
assert packages is None
assert api.current_logger.infomsg == ['No leftover packages, skipping...']


def test_remove_leftover_packages_error(monkeypatch):
def get_leftover_pkgs():
return LeftoverPackages(items=[RPM(name='rpm', version='1.0', release='1.el7', epoch='0',
packager='packager', arch='noarch', pgpsig='SIG')])

def mocked_run(cmd):
raise CalledProcessError(command=cmd,
message='mocked error',
result={'stdout': 'out', 'stderr': 'err', 'exit_code': 1, 'signal': 0})

monkeypatch.setattr(api, 'current_logger', logger_mocked())
monkeypatch.setattr(api, 'produce', produce_mocked())
monkeypatch.setattr(removeleftoverpackages, '_get_leftover_packages', get_leftover_pkgs)
monkeypatch.setattr(removeleftoverpackages, 'get_installed_rpms', lambda: [])
monkeypatch.setattr(removeleftoverpackages, 'skip_rhsm', lambda: False)
monkeypatch.setattr(removeleftoverpackages, 'run', mocked_run)

removeleftoverpackages.process()

assert api.produce.called == 0
assert api.current_logger.errmsg == ['Failed to remove packages: rpm-1.0-1.el7']


@pytest.mark.parametrize(
('installed_rpms'),
(
([]),
(['rpm1']),
(['rpm1', 'rpm2']),
)
)
def test_get_removed_packages(monkeypatch, installed_rpms):
rpm_details = {
'version': '1.0',
'release': '1.el7',
'epoch': '0',
'packager': 'packager',
'arch': 'noarch',
'pgpsig': 'SIG'
}
rpm_details_composed = '|'.join([rpm_details[key] for key in ['version', 'release', 'epoch',
'packager', 'arch', 'pgpsig']])
mocked_installed_rpms = ['{}|{}'.format(rpm, rpm_details_composed) for rpm in installed_rpms]

monkeypatch.setattr(removeleftoverpackages, 'get_installed_rpms', lambda: [])

removed_packages = removeleftoverpackages._get_removed_packages(mocked_installed_rpms)
removed_packages.items = sorted(removed_packages.items, key=lambda x: x.name)
expected_output = [RPM(name=rpm, **rpm_details) for rpm in installed_rpms]
expected_output = sorted(expected_output, key=lambda x: x.name)

assert removed_packages == RemovedPackages(items=expected_output)


@pytest.mark.parametrize(
('removed_packages', 'skip_rhsm'),
(
([], True),
([], False),
(['rpm1'], True),
(['rpm1', 'rpm2'], True),
(['rpm1', 'rpm2'], False),
)
)
def test_process(monkeypatch, removed_packages, skip_rhsm):
pkgs = [RPM(name=pkg, version='1.0', release='1.el7', epoch='0',
packager='packager', arch='noarch', pgpsig='SIG')
for pkg in removed_packages]

removed_pkgs = RemovedPackages(items=pkgs)

def mocked_run(cmd):
pkgs_joined = ['-'.join([pkg.name, pkg.version, pkg.release]) for pkg in pkgs]
expected_cmd = ['dnf', 'remove', '-y', '--noautoremove']
expected_cmd += pkgs_joined

if skip_rhsm:
expected_cmd += ['--disableplugin', 'subscription-manager']

assert cmd == expected_cmd

monkeypatch.setattr(api, 'produce', produce_mocked())
monkeypatch.setattr(removeleftoverpackages, 'run', mocked_run)
monkeypatch.setattr(removeleftoverpackages, 'skip_rhsm', lambda: skip_rhsm)
monkeypatch.setattr(removeleftoverpackages, 'get_installed_rpms', lambda: [])
monkeypatch.setattr(removeleftoverpackages, '_get_leftover_packages', lambda: LeftoverPackages(items=pkgs))
monkeypatch.setattr(removeleftoverpackages, '_get_removed_packages', lambda _: removed_pkgs)

removeleftoverpackages.process()

assert api.produce.called == 1
assert api.produce.model_instances == [removed_pkgs]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from leapp.actors import Actor
from leapp.libraries.actor import reportleftoverpackages
from leapp.models import LeftoverPackages, RemovedPackages
from leapp.reporting import Report
from leapp.tags import IPUWorkflowTag, RPMUpgradePhaseTag


class ReportLeftoverPackages(Actor):
"""
Collect messages about leftover RHEL packages from older major versions and generate a report.
Depending on execution of previous actors,
generated report contains information that there are still old RHEL packages
present on the system, which makes it unsupported or lists packages that have been removed.
"""

name = 'report_leftover_packages'
consumes = (LeftoverPackages, RemovedPackages)
produces = (Report,)
tags = (RPMUpgradePhaseTag, IPUWorkflowTag)

def process(self):
reportleftoverpackages.process()
Loading

0 comments on commit f2bd7b4

Please sign in to comment.