|  | 
|  | 1 | +"""tests/test_dmv.py: Unit test for xcp/dmv.py""" | 
|  | 2 | +import unittest | 
|  | 3 | +from unittest import mock | 
|  | 4 | +import types | 
|  | 5 | +import json | 
|  | 6 | +import errno | 
|  | 7 | +from xcp import dmv | 
|  | 8 | + | 
|  | 9 | + | 
|  | 10 | +class TestDMV(unittest.TestCase): | 
|  | 11 | +    @mock.patch("os.listdir") | 
|  | 12 | +    def test_get_all_kabi_dirs(self, m_listdir): | 
|  | 13 | +        m_listdir.return_value = ["4.19.0", "5.10.0"] | 
|  | 14 | +        dirs = dmv.get_all_kabi_dirs() | 
|  | 15 | +        self.assertIn(("4.19.0", "/lib/modules/4.19.0/updates", "/lib/modules/4.19.0/dmv"), dirs) | 
|  | 16 | +        self.assertIn(("5.10.0", "/lib/modules/5.10.0/updates", "/lib/modules/5.10.0/dmv"), dirs) | 
|  | 17 | + | 
|  | 18 | +    def test_note_offset(self): | 
|  | 19 | +        self.assertEqual(dmv.note_offset(1), 3) | 
|  | 20 | +        self.assertEqual(dmv.note_offset(4), 0) | 
|  | 21 | +        self.assertEqual(dmv.note_offset(5), 3) | 
|  | 22 | + | 
|  | 23 | +    def test_id_matches(self): | 
|  | 24 | +        self.assertTrue(dmv.id_matches("*", "1234")) | 
|  | 25 | +        self.assertTrue(dmv.id_matches("1234", "*")) | 
|  | 26 | +        self.assertTrue(dmv.id_matches("1234", "1234")) | 
|  | 27 | +        self.assertFalse(dmv.id_matches("1234", "5678")) | 
|  | 28 | + | 
|  | 29 | +    def test_pci_matches_true(self): | 
|  | 30 | +        present = {"vendor": "14e4", "device": "163c", "subvendor": "*", "subdevice": "*"} | 
|  | 31 | +        driver_pci_ids = { | 
|  | 32 | +            "abc.ko": [ | 
|  | 33 | +                {"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"} | 
|  | 34 | +            ] | 
|  | 35 | +        } | 
|  | 36 | +        self.assertTrue(dmv.pci_matches(present, driver_pci_ids)) | 
|  | 37 | + | 
|  | 38 | +    def test_pci_matches_false(self): | 
|  | 39 | +        present = {"vendor": "abcd", "device": "9999", "subvendor": "*", "subdevice": "*"} | 
|  | 40 | +        driver_pci_ids = { | 
|  | 41 | +            "abc.ko": [ | 
|  | 42 | +                {"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"} | 
|  | 43 | +            ] | 
|  | 44 | +        } | 
|  | 45 | +        self.assertFalse(dmv.pci_matches(present, driver_pci_ids)) | 
|  | 46 | + | 
|  | 47 | +    @mock.patch("re.compile") | 
|  | 48 | +    def test_hardware_present_true(self, m_compile): | 
|  | 49 | +        m = mock.Mock() | 
|  | 50 | +        m.finditer.return_value = [ | 
|  | 51 | +            mock.Mock(groupdict=lambda: {"vendor": "14e4", "device": "163c", "subvendor": "*", "subdevice": "*"}) | 
|  | 52 | +        ] | 
|  | 53 | +        m_compile.return_value = m | 
|  | 54 | +        pci_ids = { | 
|  | 55 | +            "abc.ko": [ | 
|  | 56 | +                {"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"} | 
|  | 57 | +            ] | 
|  | 58 | +        } | 
|  | 59 | +        self.assertTrue(dmv.hardware_present("dummy", pci_ids)) | 
|  | 60 | + | 
|  | 61 | +    @mock.patch("re.compile") | 
|  | 62 | +    def test_hardware_present_false_01(self, m_compile): | 
|  | 63 | +        self.assertFalse(dmv.hardware_present("dummy", None)) | 
|  | 64 | + | 
|  | 65 | +    @mock.patch("re.compile") | 
|  | 66 | +    def test_hardware_present_false_02(self, m_compile): | 
|  | 67 | +        m = mock.Mock() | 
|  | 68 | +        m.finditer.return_value = [ | 
|  | 69 | +            mock.Mock(groupdict=lambda: {"vendor": "abcd", "device": "9999", "subvendor": "*", "subdevice": "*"}) | 
|  | 70 | +        ] | 
|  | 71 | +        m_compile.return_value = m | 
|  | 72 | +        pci_ids = { | 
|  | 73 | +            "abc.ko": [ | 
|  | 74 | +                {"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"} | 
|  | 75 | +            ] | 
|  | 76 | +        } | 
|  | 77 | +        self.assertFalse(dmv.hardware_present("dummy", pci_ids)) | 
|  | 78 | + | 
|  | 79 | +    @mock.patch("os.path.isfile") | 
|  | 80 | +    @mock.patch("builtins.open", new_callable=mock.mock_open) | 
|  | 81 | +    @mock.patch("struct.calcsize") | 
|  | 82 | +    @mock.patch("struct.unpack") | 
|  | 83 | +    def test_get_active_variant(self, m_unpack, m_calcsize, m_open, m_isfile): | 
|  | 84 | +        m_isfile.return_value = True | 
|  | 85 | +        m_calcsize.return_value = 12 | 
|  | 86 | +        m_unpack.return_value = (9, 3, 1) | 
|  | 87 | +        fake_file = mock.Mock() | 
|  | 88 | +        fake_file.read.side_effect = [ | 
|  | 89 | +            b"x"*12,  # header | 
|  | 90 | +            b"",      # offset | 
|  | 91 | +            b"XenServer", b"", b"v1\x00", b"",  # vendor, offset, content, offset | 
|  | 92 | +        ] | 
|  | 93 | +        m_open.return_value.__enter__.return_value = fake_file | 
|  | 94 | +        result = dmv.get_active_variant(["foo.ko"]) | 
|  | 95 | +        self.assertEqual(result, "v1") | 
|  | 96 | + | 
|  | 97 | +        m_isfile.return_value = False | 
|  | 98 | +        result = dmv.get_active_variant(["foo.ko"]) | 
|  | 99 | +        self.assertEqual(result, None) | 
|  | 100 | + | 
|  | 101 | +    @mock.patch("os.path.isfile") | 
|  | 102 | +    def test_get_loaded_modules(self, m_isfile): | 
|  | 103 | +        m_isfile.side_effect = lambda path: "foo" in path | 
|  | 104 | +        result = dmv.get_loaded_modules(["foo.ko", "bar.ko"]) | 
|  | 105 | +        self.assertEqual(result, ["foo.ko"]) | 
|  | 106 | + | 
|  | 107 | +    @mock.patch("os.path.islink") | 
|  | 108 | +    @mock.patch("os.path.realpath") | 
|  | 109 | +    @mock.patch("os.path.dirname") | 
|  | 110 | +    @mock.patch("builtins.open", new_callable=mock.mock_open, read_data='{"variant": "v1"}') | 
|  | 111 | +    @mock.patch("json.load") | 
|  | 112 | +    def test_variant_selected(self, m_json_load, m_open, m_dirname, m_realpath, m_islink): | 
|  | 113 | +        m_islink.return_value = True | 
|  | 114 | +        m_realpath.return_value = "/some/dir" | 
|  | 115 | +        m_dirname.return_value = "/some/dir" | 
|  | 116 | +        m_json_load.return_value = {"variant": "v1"} | 
|  | 117 | +        d = dmv.DriverMultiVersion("/updates", None) | 
|  | 118 | +        result = d.variant_selected(["foo.ko"]) | 
|  | 119 | +        self.assertEqual(result, "v1") | 
|  | 120 | + | 
|  | 121 | +        m_islink.return_value = False | 
|  | 122 | +        d = dmv.DriverMultiVersion("/updates", None) | 
|  | 123 | +        result = d.variant_selected(["foo.ko"]) | 
|  | 124 | +        self.assertEqual(result, None) | 
|  | 125 | + | 
|  | 126 | +    @mock.patch("xcp.dmv.open_with_codec_handling") | 
|  | 127 | +    @mock.patch("xcp.dmv.hardware_present") | 
|  | 128 | +    def test_parse_dmv_info01(self, m_hw_present, m_open_codec): | 
|  | 129 | +        m_hw_present.return_value = True | 
|  | 130 | +        info_json = { | 
|  | 131 | +            "category": "net", | 
|  | 132 | +            "name": "foo", | 
|  | 133 | +            "description": "desc", | 
|  | 134 | +            "variant": "v1", | 
|  | 135 | +            "version": "1.0", | 
|  | 136 | +            "priority": 1, | 
|  | 137 | +            "status": "ok", | 
|  | 138 | +            "pci_ids": { | 
|  | 139 | +                "foo.ko": [ | 
|  | 140 | +                    {"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"} | 
|  | 141 | +                ] | 
|  | 142 | +            } | 
|  | 143 | +        } | 
|  | 144 | +        m_open_codec.return_value.__enter__.return_value = mock.Mock( | 
|  | 145 | +            spec=["read"], read=lambda: json.dumps(info_json) | 
|  | 146 | +        ) | 
|  | 147 | +        with mock.patch("json.load", return_value=info_json): | 
|  | 148 | +            lspci_out = types.SimpleNamespace(stdout="dummy") | 
|  | 149 | +            d = dmv.DriverMultiVersion("", lspci_out) | 
|  | 150 | +            json_data, json_formatted = d.parse_dmv_info("dummy") | 
|  | 151 | +            self.assertEqual(json_data["name"], "foo") | 
|  | 152 | +            self.assertEqual(json_formatted["type"], "net") | 
|  | 153 | +            self.assertTrue(json_formatted["variants"]["v1"]["hardware_present"]) | 
|  | 154 | + | 
|  | 155 | +    @mock.patch("xcp.dmv.open_with_codec_handling") | 
|  | 156 | +    @mock.patch("xcp.dmv.hardware_present") | 
|  | 157 | +    @mock.patch("xcp.dmv.get_active_variant") | 
|  | 158 | +    def test_parse_dmv_info02(self, m_active_variant, m_hw_present, m_open_codec): | 
|  | 159 | +        m_active_variant.return_value = "foo" | 
|  | 160 | +        m_hw_present.return_value = True | 
|  | 161 | +        info_json = { | 
|  | 162 | +            "category": "net", | 
|  | 163 | +            "name": "foo", | 
|  | 164 | +            "description": "desc", | 
|  | 165 | +            "variant": "v1", | 
|  | 166 | +            "version": "1.0", | 
|  | 167 | +            "priority": 1, | 
|  | 168 | +            "status": "ok", | 
|  | 169 | +            "pci_ids": { | 
|  | 170 | +                "foo.ko": [ | 
|  | 171 | +                    {"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"} | 
|  | 172 | +                ] | 
|  | 173 | +            } | 
|  | 174 | +        } | 
|  | 175 | +        m_open_codec.return_value.__enter__.return_value = mock.Mock( | 
|  | 176 | +            spec=["read"], read=lambda: json.dumps(info_json) | 
|  | 177 | +        ) | 
|  | 178 | +        with mock.patch("json.load", return_value=info_json): | 
|  | 179 | +            lspci_out = types.SimpleNamespace(stdout="dummy") | 
|  | 180 | +            d = dmv.DriverMultiVersion("", lspci_out, runtime=True) | 
|  | 181 | +            json_data, json_formatted = d.parse_dmv_info("dummy") | 
|  | 182 | +            self.assertEqual(json_data["name"], "foo") | 
|  | 183 | +            self.assertEqual(json_formatted["type"], "net") | 
|  | 184 | +            self.assertTrue(json_formatted["variants"]["v1"]["hardware_present"]) | 
|  | 185 | +            self.assertEqual(json_formatted["active"], "foo") | 
|  | 186 | + | 
|  | 187 | +    def test_merge_jsondata01(self): | 
|  | 188 | +        oldone = { | 
|  | 189 | +            "type": "net", | 
|  | 190 | +            "friendly_name": "foo", | 
|  | 191 | +            "description": "desc", | 
|  | 192 | +            "info": "foo", | 
|  | 193 | +            "variants": {"v1": {"version": "1.0"}}, | 
|  | 194 | +            "selected": "v1", | 
|  | 195 | +            "active": "v1", | 
|  | 196 | +            "loaded modules": ["foo.ko"] | 
|  | 197 | +        } | 
|  | 198 | +        newone = { | 
|  | 199 | +            "type": "net", | 
|  | 200 | +            "friendly_name": "foo", | 
|  | 201 | +            "description": "desc", | 
|  | 202 | +            "info": "foo", | 
|  | 203 | +            "variants": {"v2": {"version": "2.0"}}, | 
|  | 204 | +            "selected": None, | 
|  | 205 | +            "active": None, | 
|  | 206 | +            "loaded modules": ["bar.ko"] | 
|  | 207 | +        } | 
|  | 208 | +        mgr = dmv.DriverMultiVersionManager(runtime=True) | 
|  | 209 | +        mgr.merge_jsondata(oldone, newone) | 
|  | 210 | +        merged = mgr.dmv_list["drivers"]["foo"] | 
|  | 211 | +        self.assertIn("v1", merged["variants"]) | 
|  | 212 | +        self.assertIn("v2", merged["variants"]) | 
|  | 213 | +        self.assertEqual(merged["selected"], "v1") | 
|  | 214 | +        self.assertEqual(merged["active"], "v1") | 
|  | 215 | +        self.assertEqual(merged["loaded modules"], ["foo.ko", "bar.ko"]) | 
|  | 216 | + | 
|  | 217 | +    def test_merge_jsondata02(self): | 
|  | 218 | +        oldobj = { | 
|  | 219 | +            "type": "storage", | 
|  | 220 | +            "friendly_name": "foo", | 
|  | 221 | +            "description": "desc", | 
|  | 222 | +            "info": "foo", | 
|  | 223 | +            "variants": {"v1": {"version": "1.0"}}, | 
|  | 224 | +            "selected": None, | 
|  | 225 | +            "active": None, | 
|  | 226 | +            "loaded modules": ["foo.ko"] | 
|  | 227 | +        } | 
|  | 228 | +        newobj = { | 
|  | 229 | +            "type": "storage", | 
|  | 230 | +            "friendly_name": "foo", | 
|  | 231 | +            "description": "desc", | 
|  | 232 | +            "info": "foo", | 
|  | 233 | +            "variants": {"v2": {"version": "2.0"}}, | 
|  | 234 | +            "selected": "v2", | 
|  | 235 | +            "active": "v2", | 
|  | 236 | +            "loaded modules": ["bar.ko"] | 
|  | 237 | +        } | 
|  | 238 | +        mgr = dmv.DriverMultiVersionManager(runtime=True) | 
|  | 239 | +        mgr.merge_jsondata(oldobj, newobj) | 
|  | 240 | +        merged = mgr.dmv_list["drivers"]["foo"] | 
|  | 241 | +        self.assertIn("v1", merged["variants"]) | 
|  | 242 | +        self.assertIn("v2", merged["variants"]) | 
|  | 243 | +        self.assertEqual(merged["selected"], "v2") | 
|  | 244 | +        self.assertEqual(merged["active"], "v2") | 
|  | 245 | +        self.assertEqual(merged["loaded modules"], ["foo.ko", "bar.ko"]) | 
|  | 246 | + | 
|  | 247 | +    def test_process_dmv_data(self): | 
|  | 248 | +        mgr = dmv.DriverMultiVersionManager() | 
|  | 249 | +        json_data = {"name": "foo"} | 
|  | 250 | +        json_formatted = {"type": "net"} | 
|  | 251 | +        mgr.process_dmv_data(json_data, json_formatted) | 
|  | 252 | +        self.assertEqual(mgr.dmv_list["drivers"]["foo"], json_formatted) | 
|  | 253 | + | 
|  | 254 | +    @mock.patch("xcp.dmv.subprocess.run") | 
|  | 255 | +    @mock.patch("xcp.dmv.glob.glob") | 
|  | 256 | +    @mock.patch("xcp.dmv.os.makedirs") | 
|  | 257 | +    @mock.patch("xcp.dmv.os.symlink") | 
|  | 258 | +    @mock.patch("xcp.dmv.os.rename") | 
|  | 259 | +    @mock.patch("xcp.dmv.os.path.join", side_effect=lambda *args: "/".join(args)) | 
|  | 260 | +    @mock.patch("xcp.dmv.get_all_kabi_dirs") | 
|  | 261 | +    def test_create_dmv_symlink(self, m_get_dirs, m_join, m_rename, m_symlink, m_makedirs, m_glob, m_run): | 
|  | 262 | +        m_get_dirs.return_value = [("5.10.0", "/lib/modules/5.10.0/updates", "/lib/modules/5.10.0/dmv")] | 
|  | 263 | +        m_glob.return_value = ["/lib/modules/5.10.0/dmv/foo/1.0/bar.ko"] | 
|  | 264 | +        m_symlink.side_effect = None | 
|  | 265 | +        m_rename.side_effect = None | 
|  | 266 | +        m_run.side_effect = None | 
|  | 267 | + | 
|  | 268 | +        mgr = dmv.DriverMultiVersionManager() | 
|  | 269 | +        result = mgr.create_dmv_symlink("foo", "1.0") | 
|  | 270 | +        self.assertTrue(result) | 
|  | 271 | +        m_makedirs.assert_called_with("/lib/modules/5.10.0/updates", exist_ok=True) | 
|  | 272 | +        m_symlink.assert_called() | 
|  | 273 | +        m_rename.assert_called() | 
|  | 274 | + | 
|  | 275 | +    def test_get_set_error(self): | 
|  | 276 | +        mgr = dmv.DriverMultiVersionManager() | 
|  | 277 | +        mgr.set_dmv_error(errno.ENOENT) | 
|  | 278 | +        err = mgr.get_dmv_error() | 
|  | 279 | +        self.assertEqual(err["exit_code"], errno.ENOENT) | 
|  | 280 | +        self.assertIn("No such file", err["message"]) | 
0 commit comments