diff --git a/src/subscription_manager/repofile.py b/src/subscription_manager/repofile.py index c5bbee9ba..4079ecb15 100644 --- a/src/subscription_manager/repofile.py +++ b/src/subscription_manager/repofile.py @@ -34,7 +34,7 @@ from subscription_manager import utils from subscription_manager.certdirectory import Path import configparser -from urllib.parse import parse_qs, urlparse, urlunparse, urlencode +from urllib.parse import parse_qs, urlparse, urlunparse, urlencode, unquote from rhsm.config import get_config_parser @@ -461,19 +461,26 @@ def sections(self): def fix_content(self, content): # Luckily apt ignores all Fields it does not recognize - baseurl = content["baseurl"] - url_res = re.match(r"^https?://(?P.*)$", baseurl) + parsed_url = urlparse(unquote(content["baseurl"])) + baseurl = parsed_url._replace(query="").geturl() ent_res = re.match(r"^/etc/pki/entitlement/(?P.*).pem$", content["sslclientcert"]) - if url_res and ent_res: - location = url_res.group("location") - entitlement = ent_res.group("entitlement") - baseurl = "katello://{}@{}".format(entitlement, location) + if ent_res: + netloc = ent_res.group("entitlement") + '@' + parsed_url.netloc + baseurl = parsed_url._replace(scheme="katello",netloc=netloc,query="").geturl() + + query = parse_qs(parsed_url.query) + if "rel" in query and "comp" in query: + suites = " ".join(query["rel"][0].split(",")) + components = " ".join(query["comp"][0].split(",")) + else: + suites = "default" + components = "all" apt_cont = content.copy() apt_cont["Types"] = "deb" apt_cont["URIs"] = baseurl - apt_cont["Suites"] = "default" - apt_cont["Components"] = "all" + apt_cont["Suites"] = suites + apt_cont["Components"] = components apt_cont["Trusted"] = "yes" if apt_cont["arches"] is None or apt_cont["arches"] == ["ALL"]: diff --git a/test/test_repolib.py b/test/test_repolib.py index 102cfce0a..44eea0df4 100644 --- a/test/test_repolib.py +++ b/test/test_repolib.py @@ -1041,6 +1041,109 @@ def test_fix_content_default(self, mock_file): self.assertIn(key, act_content) self.assertEqual(act_content[key], exp_params[key]) + @patch("builtins.open", new_callable=mock_open, read_data="data") + def test_fix_content_url_params(self, mock_file): + # Setup mock and expected values + assert open("/etc/apt/sources.list.d/rhsm.sources").read() == "data" + mock_file.assert_called_with("/etc/apt/sources.list.d/rhsm.sources") + exp_params = { + "arches": "none", + "Types": "deb", + "URIs": "https://example.site.org/", + "Suites": "focal", + "Components": "puppet7", + "Trusted": "yes", + "sslclientcert": "mypem.pem", + } + ar = self._helper_stub_repofile() + repo_mock = self._helper_stub_repo( + "mock", + existing_values=[ + ("baseurl", "https://example.site.org/?comp=puppet7&rel=focal"), + ("sslclientcert", exp_params["sslclientcert"]), + ], + ) + # Modify data + act_content = ar.fix_content(repo_mock) + # Test modification by comparing with expected values + for key in exp_params: + self.assertIn(key, act_content) + self.assertEqual(act_content[key], exp_params[key]) + + @patch("builtins.open", new_callable=mock_open, read_data="data") + def test_fix_content_url_params_multi(self, mock_file): + # Setup mock and expected values + assert open("/etc/apt/sources.list.d/rhsm.sources").read() == "data" + mock_file.assert_called_with("/etc/apt/sources.list.d/rhsm.sources") + exp_params = { + "arches": "none", + "Types": "deb", + "URIs": "https://example.site.org/", + "Suites": "focal jammy", + "Components": "puppet7 puppet6", + "Trusted": "yes", + "sslclientcert": "mypem.pem", + } + ar = self._helper_stub_repofile() + repo_mock = self._helper_stub_repo( + "mock", + existing_values=[ + ("baseurl", "https://example.site.org/?comp=puppet7,puppet6&rel=focal,jammy"), + ("sslclientcert", exp_params["sslclientcert"]), + ], + ) + # Modify data + act_content = ar.fix_content(repo_mock) + # Test modification by comparing with expected values + for key in exp_params: + self.assertIn(key, act_content) + self.assertEqual(act_content[key], exp_params[key]) + + @patch("builtins.open", new_callable=mock_open, read_data="data") + def test_fix_content_url_single_param(self, mock_file): + # Setup mock and expected values + assert open("/etc/apt/sources.list.d/rhsm.sources").read() == "data" + mock_file.assert_called_with("/etc/apt/sources.list.d/rhsm.sources") + exp_params = { + "arches": "none", + "Types": "deb", + "URIs": "https://example.site.org/", + "Suites": "default", + "Components": "all", + "Trusted": "yes", + "sslclientcert": "mypem.pem", + } + # Test only 'comp' param + ar = self._helper_stub_repofile() + repo_mock = self._helper_stub_repo( + "mock", + existing_values=[ + ("baseurl", "https://example.site.org/?comp=puppet7"), + ("sslclientcert", exp_params["sslclientcert"]), + ], + ) + # Modify data + act_content = ar.fix_content(repo_mock) + # Test modification by comparing with expected values + for key in exp_params: + self.assertIn(key, act_content) + self.assertEqual(act_content[key], exp_params[key]) + # Test only 'rel' param + ar = self._helper_stub_repofile() + repo_mock = self._helper_stub_repo( + "mock", + existing_values=[ + ("baseurl", "https://example.site.org/?rel=focal,jammy"), + ("sslclientcert", exp_params["sslclientcert"]), + ], + ) + # Modify data + act_content = ar.fix_content(repo_mock) + # Test modification by comparing with expected values + for key in exp_params: + self.assertIn(key, act_content) + self.assertEqual(act_content[key], exp_params[key]) + @patch("builtins.open", new_callable=mock_open, read_data="data") def test_fix_content_arches(self, mock_file): # Setup mock and expected values