diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 6b6f7665e82931..3dbe100ad0932c 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -428,6 +428,7 @@ jobs: scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_SC_3_6.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"' scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_DA_1_7.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --bool-arg allow_sdk_dac:true"' scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1 --enable-key 000102030405060708090a0b0c0d0e0f" --script "src/python_testing/TC_TestEventTrigger.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --bool-arg allow_sdk_dac:true"' + scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"' - name: Uploading core files uses: actions/upload-artifact@v3 if: ${{ failure() && !env.ACT }} diff --git a/src/python_testing/TC_ACE_1_3.py b/src/python_testing/TC_ACE_1_3.py new file mode 100644 index 00000000000000..dd3639d7c0b476 --- /dev/null +++ b/src/python_testing/TC_ACE_1_3.py @@ -0,0 +1,333 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging + +import chip.clusters as Clusters +from chip.interaction_model import InteractionModelError, Status +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from mobly import asserts + + +def acl_subject(cat: int) -> int: + return 0xFFFFFFFD00000000 | cat + + +class TC_ACE_1_3(MatterBaseTest): + + async def write_acl(self, acl): + # This returns an attribute status + result = await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))]) + asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed") + print(result) + + async def read_descriptor_expect_success(self, th): + cluster = Clusters.Objects.Descriptor + attribute = Clusters.Descriptor.Attributes.DeviceTypeList + await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute) + + async def read_descriptor_expect_unsupported_access(self, th): + cluster = Clusters.Objects.Descriptor + attribute = Clusters.Descriptor.Attributes.DeviceTypeList + await self.read_single_attribute_expect_error(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) + + @async_test_body + async def test_TC_ACE_1_3(self): + cat1_id = 0x11110000 + cat2_id = 0x22220000 + + cat1v1 = cat1_id | 0x0001 + cat1v2 = cat1_id | 0x0002 + cat1v3 = cat1_id | 0x0003 + cat2v1 = cat2_id | 0x0001 + cat2v2 = cat2_id | 0x0002 + cat2v3 = cat2_id | 0x0003 + logging.info('cat1v1 0x%x', cat1v1) + + self.print_step(1, "Commissioning, already done") + TH0 = self.default_controller + fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0] + + TH0_nodeid = self.matter_test_config.controller_node_id + TH1_nodeid = self.matter_test_config.controller_node_id + 1 + TH2_nodeid = self.matter_test_config.controller_node_id + 2 + TH3_nodeid = self.matter_test_config.controller_node_id + 3 + + TH1 = fabric_admin.NewController(nodeId=TH1_nodeid, + paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path), + catTags=[cat1v3]) + TH2 = fabric_admin.NewController(nodeId=TH2_nodeid, + paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path), + catTags=[cat1v2, cat2v1]) + TH3 = fabric_admin.NewController(nodeId=TH3_nodeid, + paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path), + catTags=[cat1v1, cat2v2]) + + self.print_step(2, "TH0 writes ACL all view on PIXIT.ACE.TESTENDPOINT") + TH0_admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH0_nodeid], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0, cluster=0x001f)]) + all_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + acl = [TH0_admin_acl, all_view] + await self.write_acl(acl) + + self.print_step(3, "TH1 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH1) + + self.print_step(4, "TH2 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH2) + + self.print_step(5, "TH3 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH3) + + self.print_step(6, "TH0 writes ACL TH1 view on EP0") + th1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH1_nodeid], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + acl = [TH0_admin_acl, th1_view] + await self.write_acl(acl) + self.print_step(7, "TH1 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH1) + + self.print_step(8, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH2) + + self.print_step(9, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH3) + + self.print_step(10, "TH0 writes ACL TH2 view on EP0") + th2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH2_nodeid], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, th2_view] + await self.write_acl(acl) + self.print_step(11, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH1) + + self.print_step(12, "TH2 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH2) + + self.print_step(13, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH3) + + self.print_step(14, "TH0 writes ACL TH3 view on EP0") + th3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH3_nodeid], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, th3_view] + await self.write_acl(acl) + self.print_step(15, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH1) + + self.print_step(16, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH2) + + self.print_step(17, "TH3 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH3) + + self.print_step(18, "TH0 writes ACL TH1 TH2 view on EP0") + th12_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH1_nodeid, TH2_nodeid], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, th12_view] + await self.write_acl(acl) + self.print_step(19, "TH1 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH1) + + self.print_step(20, "TH2 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH2) + + self.print_step(21, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH3) + + self.print_step(22, "TH0 writes ACL TH1 TH3 view on EP0") + th13_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH1_nodeid, TH3_nodeid], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, th13_view] + await self.write_acl(acl) + self.print_step(23, "TH1 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH1) + + self.print_step(24, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH2) + + self.print_step(25, "TH3 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH3) + + self.print_step(26, "TH0 writes ACL TH2 TH3 view on EP0") + th23_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH2_nodeid, TH3_nodeid], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, th23_view] + await self.write_acl(acl) + self.print_step(27, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH1) + + self.print_step(28, "TH2 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH2) + + self.print_step(29, "TH3 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH3) + + self.print_step(30, "TH0 writes ACL TH1 TH2 TH3 view on EP0") + th123_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH1_nodeid, TH2_nodeid, TH3_nodeid], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, th123_view] + await self.write_acl(acl) + self.print_step(31, "TH1 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH1) + + self.print_step(32, "TH2 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH2) + + self.print_step(33, "TH3 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH3) + + self.print_step(34, "TH0 writes ACL cat1v1 view on EP0") + cat1v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[acl_subject(cat1v1)], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + acl = [TH0_admin_acl, cat1v1_view] + await self.write_acl(acl) + + self.print_step(35, "TH1 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH1) + + self.print_step(36, "TH2 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH2) + + self.print_step(37, "TH3 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH3) + + self.print_step(38, "TH0 writes ACL cat1v2 view on EP0") + cat1v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[acl_subject(cat1v2)], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, cat1v2_view] + await self.write_acl(acl) + + self.print_step(39, "TH1 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH1) + + self.print_step(40, "TH2 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH2) + + self.print_step(41, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH3) + + self.print_step(42, "TH0 writes ACL cat1v3 view on EP0") + cat1v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[acl_subject(cat1v3)], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, cat1v3_view] + await self.write_acl(acl) + + self.print_step(43, "TH1 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH1) + + self.print_step(44, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH2) + + self.print_step(45, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH3) + + self.print_step(46, "TH0 writes ACL cat2v1 view on EP0") + cat2v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[acl_subject(cat2v1)], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, cat2v1_view] + await self.write_acl(acl) + + self.print_step(47, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH1) + + self.print_step(48, "TH2 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH2) + + self.print_step(49, "TH3 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH3) + + self.print_step(50, "TH0 writes ACL cat2v2 view on EP0") + cat2v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[acl_subject(cat2v2)], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, cat2v2_view] + await self.write_acl(acl) + + self.print_step(51, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH1) + + self.print_step(52, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH2) + + self.print_step(53, "TH3 reads EP0 descriptor - expect SUCCESS") + await self.read_descriptor_expect_success(TH3) + + self.print_step(54, "TH0 writes ACL cat2v3 view on EP0") + cat2v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[acl_subject(cat2v3)], + targets=[Clusters.AccessControl.Structs.Target(endpoint=0)]) + + acl = [TH0_admin_acl, cat2v3_view] + await self.write_acl(acl) + + self.print_step(55, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH1) + + self.print_step(56, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH2) + + self.print_step(57, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS") + await self.read_descriptor_expect_unsupported_access(TH3) + + self.print_step(58, "TH0 writes ACL back to default") + + acl = [TH0_admin_acl] + await self.write_acl(acl) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 3b3555a9a8a5d0..89e52736e86556 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -30,16 +30,22 @@ from dataclasses import dataclass, field from typing import List, Optional, Tuple +# isort: off + +from chip import ChipDeviceCtrl # Needed before chip.FabricAdmin +import chip.FabricAdmin # Needed before chip.CertificateAuthority import chip.CertificateAuthority + +# isort: on + import chip.clusters as Clusters -import chip.FabricAdmin import chip.logging import chip.native -from chip import ChipDeviceCtrl from chip.ChipStack import * +from chip.interaction_model import InteractionModelError, Status from chip.storage import PersistentStorage from chip.utils import CommissioningBuildingBlocks -from mobly import base_test, logger, signals, utils +from mobly import asserts, base_test, logger, signals, utils from mobly.config_parser import ENV_MOBLY_LOGPATH, TestRunConfig from mobly.test_runner import TestRunner @@ -55,7 +61,7 @@ _DEFAULT_STORAGE_PATH = "admin_storage.json" _DEFAULT_LOG_PATH = "/tmp/matter_testing/logs" _DEFAULT_CONTROLLER_NODE_ID = 112233 -_DEFAULT_DUT_NODE_ID = 111 +_DEFAULT_DUT_NODE_ID = 0x12344321 _DEFAULT_TRUST_ROOT_INDEX = 1 # Mobly cannot deal with user config passing of ctypes objects, @@ -265,6 +271,37 @@ async def read_single_attribute(self, dev_ctrl: ChipDeviceCtrl, node_id: int, en data = result[endpoint] return list(data.values())[0][attribute] + async def read_single_attribute_check_success(self, cluster: object, attribute: object, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object: + if dev_ctrl is None: + dev_ctrl = self.default_controller + if node_id is None: + node_id = self.dut_node_id + + result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)]) + attr_ret = result[endpoint][cluster][attribute] + err_msg = "Error reading {}:{}".format(str(cluster), str(attribute)) + asserts.assert_true(attr_ret is not None, err_msg) + asserts.assert_false(isinstance(attr_ret, Clusters.Attribute.ValueDecodeFailure), err_msg) + return attr_ret + + async def read_single_attribute_expect_error(self, cluster: object, attribute: object, error: Status, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object: + if dev_ctrl is None: + dev_ctrl = self.default_controller + if node_id is None: + node_id = self.dut_node_id + + result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)]) + attr_ret = result[endpoint][cluster][attribute] + err_msg = "Did not see expected error when reading {}:{}".format(str(cluster), str(attribute)) + asserts.assert_true(attr_ret is not None, err_msg) + asserts.assert_true(isinstance(attr_ret, Clusters.Attribute.ValueDecodeFailure), err_msg) + asserts.assert_true(isinstance(attr_ret.Reason, InteractionModelError), err_msg) + asserts.assert_equal(attr_ret.Reason.status, error, err_msg) + return attr_ret + + def print_step(self, stepnum: int, title: str) -> None: + logging.info('***** Test Step %d : %s', stepnum, title) + def generate_mobly_test_config(matter_test_config: MatterTestConfig): test_run_config = TestRunConfig()