From b3c844bd669a4af978e1e65b195ce0a65fb26b3e Mon Sep 17 00:00:00 2001 From: C Freeman Date: Fri, 27 Oct 2023 15:01:14 -0400 Subject: [PATCH] Re enable spec parsing (#30066) * Reapply "Cluster conformance checker script (#29895)" This reverts commit 8140975925a0f7d1662d6a57fe5aab2b722b14d7. * More resiliance to mismatches in spec * Restyled by isort --------- Co-authored-by: Restyled.io --- .github/workflows/tests.yaml | 1 + .../TC_DeviceBasicComposition.py | 128 +++- src/python_testing/TestConformanceSupport.py | 575 ++++++++++++++++++ src/python_testing/conformance_support.py | 263 ++++++++ src/python_testing/matter_testing_support.py | 21 +- src/python_testing/spec_parsing_support.py | 343 +++++++++++ 6 files changed, 1326 insertions(+), 5 deletions(-) create mode 100644 src/python_testing/TestConformanceSupport.py create mode 100644 src/python_testing/conformance_support.py create mode 100644 src/python_testing/spec_parsing_support.py diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 97897e8086c941..d6c580e643e9ff 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -468,6 +468,7 @@ jobs: scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_RVCCLEANM_1_2.py" --script-args "--int-arg PIXIT_ENDPOINT:1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_RVCRUNM_1_2.py" --script-args "--int-arg PIXIT_ENDPOINT:1 --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' + scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --script "src/python_testing/TestConformanceSupport.py" --script-args "--trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-lock-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-lock-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DRLK_2_12.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' diff --git a/src/python_testing/TC_DeviceBasicComposition.py b/src/python_testing/TC_DeviceBasicComposition.py index b333085fa2eef1..1f5dc38813a975 100644 --- a/src/python_testing/TC_DeviceBasicComposition.py +++ b/src/python_testing/TC_DeviceBasicComposition.py @@ -31,8 +31,11 @@ import chip.clusters.ClusterObjects import chip.tlv from chip.clusters.Attribute import ValueDecodeFailure -from matter_testing_support import AttributePathLocation, MatterBaseTest, async_test_body, default_matter_test_main +from conformance_support import ConformanceDecision, conformance_allowed +from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, + async_test_body, default_matter_test_main) from mobly import asserts +from spec_parsing_support import CommandType, build_xml_clusters def MatterTlvToJson(tlv_data: dict[int, Any]) -> dict[str, Any]: @@ -870,6 +873,129 @@ def test_DESC_2_2(self): if problems or root_problems: self.fail_current_test("Problems with tags lists") + def test_spec_conformance(self): + success = True + # TODO: provisional needs to be an input parameter + allow_provisional = True + clusters, problems = build_xml_clusters() + self.problems = self.problems + problems + for id in sorted(list(clusters.keys())): + print(f'{id} 0x{id:02x}: {clusters[id].name}') + for endpoint_id, endpoint in self.endpoints_tlv.items(): + for cluster_id, cluster in endpoint.items(): + if cluster_id not in clusters.keys(): + if (cluster_id & 0xFFFF_0000) != 0: + # manufacturer cluster + continue + location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id) + # TODO: update this from a warning once we have all the data + self.record_warning(self.get_test_name(), location=location, + problem='Standard cluster found on device, but is not present in spec data') + continue + + # TODO: switch to use global FEATURE_MAP_ID etc. once the IDM-10.1 change is merged. + FEATURE_MAP_ID = 0xFFFC + ATTRIBUTE_LIST_ID = 0xFFFB + ACCEPTED_COMMAND_ID = 0xFFF9 + GENERATED_COMMAND_ID = 0xFFF8 + + feature_map = cluster[FEATURE_MAP_ID] + attribute_list = cluster[ATTRIBUTE_LIST_ID] + all_command_list = cluster[ACCEPTED_COMMAND_ID] + cluster[GENERATED_COMMAND_ID] + + # Feature conformance checking + feature_masks = [1 << i for i in range(32) if feature_map & (1 << i)] + for f in feature_masks: + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=FEATURE_MAP_ID) + if f not in clusters[cluster_id].features.keys(): + self.record_error(self.get_test_name(), location=location, problem=f'Unknown feature with mask 0x{f:02x}') + success = False + continue + xml_feature = clusters[cluster_id].features[f] + conformance_decision = xml_feature.conformance(feature_map, attribute_list, all_command_list) + if not conformance_allowed(conformance_decision, allow_provisional): + self.record_error(self.get_test_name(), location=location, + problem=f'Disallowed feature with mask 0x{f:02x}') + success = False + for feature_mask, xml_feature in clusters[cluster_id].features.items(): + conformance_decision = xml_feature.conformance(feature_map, attribute_list, all_command_list) + if conformance_decision == ConformanceDecision.MANDATORY and feature_mask not in feature_masks: + self.record_error(self.get_test_name(), location=location, + problem=f'Required feature with mask 0x{f:02x} is not present in feature map') + success = False + + # Attribute conformance checking + for attribute_id, attribute in cluster.items(): + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) + if attribute_id not in clusters[cluster_id].attributes.keys(): + # TODO: Consolidate the range checks with IDM-10.1 once that lands + if attribute_id <= 0x4FFF: + # manufacturer attribute + self.record_error(self.get_test_name(), location=location, + problem='Standard attribute found on device, but not in spec') + success = False + continue + xml_attribute = clusters[cluster_id].attributes[attribute_id] + conformance_decision = xml_attribute.conformance(feature_map, attribute_list, all_command_list) + if not conformance_allowed(conformance_decision, allow_provisional): + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) + self.record_error(self.get_test_name(), location=location, + problem=f'Attribute 0x{attribute_id:02x} is included, but is disallowed by conformance') + success = False + for attribute_id, xml_attribute in clusters[cluster_id].attributes.items(): + conformance_decision = xml_attribute.conformance(feature_map, attribute_list, all_command_list) + if conformance_decision == ConformanceDecision.MANDATORY and attribute_id not in cluster.keys(): + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) + self.record_error(self.get_test_name(), location=location, + problem=f'Attribute 0x{attribute_id:02x} is required, but is not present on the DUT') + success = False + + def check_spec_conformance_for_commands(command_type: CommandType) -> bool: + success = True + # TODO: once IDM-10.1 lands, use the globals + global_attribute_id = 0xFFF9 if command_type == CommandType.ACCEPTED else 0xFFF8 + xml_commands_dict = clusters[cluster_id].accepted_commands if command_type == CommandType.ACCEPTED else clusters[cluster_id].generated_commands + command_list = cluster[global_attribute_id] + for command_id in command_list: + location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id) + if command_id not in xml_commands_dict: + # TODO: Consolidate range checks with IDM-10.1 once that lands + if command_id <= 0xFF: + # manufacturer command + continue + self.record_error(self.get_test_name(), location=location, + problem='Standard command found on device, but not in spec') + success = False + continue + xml_command = xml_commands_dict[command_id] + conformance_decision = xml_command.conformance(feature_map, attribute_list, all_command_list) + if not conformance_allowed(conformance_decision, allow_provisional): + self.record_error(self.get_test_name(), location=location, + problem=f'Command 0x{command_id:02x} is included, but disallowed by conformance') + success = False + for command_id, xml_command in xml_commands_dict.items(): + conformance_decision = xml_command.conformance(feature_map, attribute_list, all_command_list) + if conformance_decision == ConformanceDecision.MANDATORY and command_id not in command_list: + location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id) + self.record_error(self.get_test_name(), location=location, + problem=f'Command 0x{command_id:02x} is required, but is not present on the DUT') + success = False + return success + + # Command conformance checking + cmd_success = check_spec_conformance_for_commands(CommandType.ACCEPTED) + success = False if not cmd_success else success + cmd_success = check_spec_conformance_for_commands(CommandType.GENERATED) + success = False if not cmd_success else success + + # TODO: Add choice checkers + + if not success: + # TODO: Right now, we have failures in all-cluster, so we can't fail this test and keep it in CI. For now, just log. + # Issue tracking: #29812 + # self.fail_current_test("Problems with conformance") + logging.error("Problems found with conformance, this should turn into a test failure once #29812 is resolved") + if __name__ == "__main__": default_matter_test_main() diff --git a/src/python_testing/TestConformanceSupport.py b/src/python_testing/TestConformanceSupport.py new file mode 100644 index 00000000000000..53f9e885ff9449 --- /dev/null +++ b/src/python_testing/TestConformanceSupport.py @@ -0,0 +1,575 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import xml.etree.ElementTree as ElementTree + +from conformance_support import ConformanceDecision, ConformanceParseParameters, parse_callable_from_xml +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from mobly import asserts + + +class TestConformanceSupport(MatterBaseTest): + @async_test_body + async def setup_class(self): + super().setup_class() + # a small feature map + self.feature_names_to_bits = {'AB': 0x01, 'CD': 0x02} + + # none, AB, CD, AB&CD + self.feature_maps = [0x00, 0x01, 0x02, 0x03] + self.has_ab = [False, True, False, True] + self.has_cd = [False, False, True, True] + + self.attribute_names_to_values = {'attr1': 0x00, 'attr2': 0x01} + self.attribute_lists = [[], [0x00], [0x01], [0x00, 0x01]] + self.has_attr1 = [False, True, False, True] + self.has_attr2 = [False, False, True, True] + + self.command_names_to_values = {'cmd1': 0x00, 'cmd2': 0x01} + self.cmd_lists = [[], [0x00], [0x01], [0x00, 0x01]] + self.has_cmd1 = [False, True, False, True] + self.has_cmd2 = [False, False, True, True] + self.params = ConformanceParseParameters( + feature_map=self.feature_names_to_bits, attribute_map=self.attribute_names_to_values, command_map=self.command_names_to_values) + + @async_test_body + async def test_conformance_mandatory(self): + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + + @async_test_body + async def test_conformance_optional(self): + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + + @async_test_body + async def test_conformance_disallowed(self): + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.DISALLOWED) + + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.DISALLOWED) + + @async_test_body + async def test_conformance_provisional(self): + xml = '' + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for f in self.feature_maps: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.PROVISIONAL) + + @async_test_body + async def test_conformance_mandatory_on_condition(self): + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # single attribute mandatory + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr1[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + # test command in optional and in boolean - this is the same as attribute essentially, so testing every permutation is overkill + + @async_test_body + async def test_conformance_optional_on_condition(self): + # single feature optional + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # single attribute optional + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr1[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + # single command optional + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, c in enumerate(self.cmd_lists): + if self.has_cmd1[i]: + asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, c in enumerate(self.cmd_lists): + if self.has_cmd2[i]: + asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(0x00, [], c), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_not_term_mandatory(self): + # single feature not mandatory + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # single attribute not mandatory + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if not self.has_attr1[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if not self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_not_term_optional(self): + # single feature not optional + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + xml = ('' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_and_term(self): + # and term for features only + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i] and self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # and term for attributes only + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr1[i] and self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + # and term for feature and attribute + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + for j, a in enumerate(self.attribute_lists): + if self.has_ab[i] and self.has_attr2[j]: + asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_or_term(self): + # or term feature only + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i] or self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # or term attribute only + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, a in enumerate(self.attribute_lists): + if self.has_attr1[i] or self.has_attr2[i]: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(0x00, a, []), ConformanceDecision.NOT_APPLICABLE) + + # or term feature and attribute + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + for j, a in enumerate(self.attribute_lists): + if self.has_ab[i] or self.has_attr2[j]: + asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, a, []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_and_term_with_not(self): + # and term with not + xml = ('' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not self.has_ab[i] and self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_or_term_with_not(self): + # or term with not on second feature + xml = ('' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i] or not self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # not around or term with + xml = ('' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if not (self.has_ab[i] or self.has_cd[i]): + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_and_term_with_three_terms(self): + # and term with three features + xml = ('' + '' + '' + '' + '' + '' + '') + self.feature_names_to_bits['EF'] = 0x04 + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + # no features + asserts.assert_equal(xml_callable(0x00, [], []), ConformanceDecision.NOT_APPLICABLE) + # one feature + asserts.assert_equal(xml_callable(0x01, [], []), ConformanceDecision.NOT_APPLICABLE) + # all features + asserts.assert_equal(xml_callable(0x07, [], []), ConformanceDecision.OPTIONAL) + + # and term with one of each + xml = ('' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + for j, a in enumerate(self.attribute_lists): + for k, c in enumerate(self.cmd_lists): + if self.has_ab[i] and self.has_attr1[j] and self.has_cmd1[k]: + asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.NOT_APPLICABLE) + + @async_test_body + async def test_conformance_or_term_with_three_terms(self): + # or term with three features + xml = ('' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + # no features + asserts.assert_equal(xml_callable(0x00, [], []), ConformanceDecision.NOT_APPLICABLE) + # one feature + asserts.assert_equal(xml_callable(0x01, [], []), ConformanceDecision.OPTIONAL) + # all features + asserts.assert_equal(xml_callable(0x07, [], []), ConformanceDecision.OPTIONAL) + + # or term with one of each + xml = ('' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + for j, a in enumerate(self.attribute_lists): + for k, c in enumerate(self.cmd_lists): + if self.has_ab[i] or self.has_attr1[j] or self.has_cmd1[k]: + asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, a, c), ConformanceDecision.NOT_APPLICABLE) + + def test_conformance_otherwise(self): + # AB, O + xml = ('' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + + # AB, [CD] + xml = ('' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + elif self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.OPTIONAL) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.NOT_APPLICABLE) + + # AB & !CD, P + xml = ('' + '' + '' + '' + '' + '' + '' + '' + '' + '' + '') + et = ElementTree.fromstring(xml) + xml_callable = parse_callable_from_xml(et, self.params) + for i, f in enumerate(self.feature_maps): + if self.has_ab[i] and not self.has_cd[i]: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.MANDATORY) + else: + asserts.assert_equal(xml_callable(f, [], []), ConformanceDecision.PROVISIONAL) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/conformance_support.py b/src/python_testing/conformance_support.py new file mode 100644 index 00000000000000..2dabb584c9d0f7 --- /dev/null +++ b/src/python_testing/conformance_support.py @@ -0,0 +1,263 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import xml.etree.ElementTree as ElementTree +from dataclasses import dataclass +from enum import Enum, auto +from typing import Callable + +from chip.tlv import uint + +OTHERWISE_CONFORM = 'otherwiseConform' +OPTIONAL_CONFORM = 'optionalConform' +PROVISIONAL_CONFORM = 'provisionalConform' +MANDATORY_CONFORM = 'mandatoryConform' +DEPRECATE_CONFORM = 'deprecateConform' +DISALLOW_CONFORM = 'disallowConform' +AND_TERM = 'andTerm' +OR_TERM = 'orTerm' +NOT_TERM = 'notTerm' +FEATURE_TAG = 'feature' +ATTRIBUTE_TAG = 'attribute' +COMMAND_TAG = 'command' + + +class ConformanceException(Exception): + def __init__(self, msg): + self.msg = msg + + def __str__(self): + return f"ConformanceException({self.msg})" + + +class ConformanceDecision(Enum): + MANDATORY = auto() + OPTIONAL = auto() + NOT_APPLICABLE = auto() + DISALLOWED = auto() + PROVISIONAL = auto() + + +@dataclass +class ConformanceParseParameters: + feature_map: dict[str, uint] + attribute_map: dict[str, uint] + command_map: dict[str, uint] + + +def conformance_allowed(conformance_decision: ConformanceDecision, allow_provisional: bool): + if conformance_decision == ConformanceDecision.NOT_APPLICABLE or conformance_decision == ConformanceDecision.DISALLOWED: + return False + if conformance_decision == ConformanceDecision.PROVISIONAL: + return allow_provisional + return True + + +def mandatory(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.MANDATORY + + +def optional(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.OPTIONAL + + +def deprecated(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.DISALLOWED + + +def disallowed(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.DISALLOWED + + +def provisional(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return ConformanceDecision.PROVISIONAL + + +def feature(requiredFeature: uint) -> Callable: + def feature_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + if requiredFeature & feature_map != 0: + return ConformanceDecision.MANDATORY + return ConformanceDecision.NOT_APPLICABLE + return feature_inner + + +def attribute(requiredAttribute: uint) -> Callable: + def attribute_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + if requiredAttribute in attribute_list: + return ConformanceDecision.MANDATORY + return ConformanceDecision.NOT_APPLICABLE + return attribute_inner + + +def command(requiredCommand: uint) -> Callable: + def command_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + if requiredCommand in all_command_list: + return ConformanceDecision.MANDATORY + return ConformanceDecision.NOT_APPLICABLE + return command_inner + + +def optional_wrapper(op: Callable) -> Callable: + def optional_wrapper_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + decision = op(feature_map, attribute_list, all_command_list) + if decision == ConformanceDecision.MANDATORY or decision == ConformanceDecision.OPTIONAL: + return ConformanceDecision.OPTIONAL + elif decision == ConformanceDecision.NOT_APPLICABLE: + return ConformanceDecision.NOT_APPLICABLE + else: + raise ConformanceException(f'Optional wrapping invalid op {decision}') + return optional_wrapper_inner + + +def mandatory_wrapper(op: Callable) -> Callable: + def mandatory_wrapper_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + return op(feature_map, attribute_list, all_command_list) + return mandatory_wrapper_inner + + +def not_operation(op: Callable): + def not_operation_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + # not operations can't be used with anything that returns DISALLOWED + # not operations also can't be used with things that are optional + # ie, ![AB] doesn't make sense, nor does !O + decision = op(feature_map, attribute_list, all_command_list) + if decision == ConformanceDecision.OPTIONAL or decision == ConformanceDecision.DISALLOWED or decision == ConformanceDecision.PROVISIONAL: + raise ConformanceException('NOT operation on optional or disallowed item') + elif decision == ConformanceDecision.NOT_APPLICABLE: + return ConformanceDecision.MANDATORY + elif decision == ConformanceDecision.MANDATORY: + return ConformanceDecision.NOT_APPLICABLE + else: + raise ConformanceException('NOT called on item with non-conformance value') + return not_operation_inner + + +def and_operation(op_list: list[Callable]) -> Callable: + def and_operation_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + for op in op_list: + decision = op(feature_map, attribute_list, all_command_list) + # and operations can't happen on optional or disallowed + if decision == ConformanceDecision.OPTIONAL or decision == ConformanceDecision.DISALLOWED or decision == ConformanceDecision.PROVISIONAL: + raise ConformanceException('AND operation on optional or disallowed item') + elif decision == ConformanceDecision.NOT_APPLICABLE: + return ConformanceDecision.NOT_APPLICABLE + elif decision == ConformanceDecision.MANDATORY: + continue + else: + raise ConformanceException('Oplist item returned non-conformance value') + return ConformanceDecision.MANDATORY + return and_operation_inner + + +def or_operation(op_list: list[Callable]) -> Callable: + def or_operation_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + for op in op_list: + decision = op(feature_map, attribute_list, all_command_list) + if decision == ConformanceDecision.DISALLOWED or decision == ConformanceDecision.PROVISIONAL: + raise ConformanceException('OR operation on optional or disallowed item') + elif decision == ConformanceDecision.NOT_APPLICABLE: + continue + elif decision == ConformanceDecision.MANDATORY: + return ConformanceDecision.MANDATORY + elif decision == ConformanceDecision.OPTIONAL: + return ConformanceDecision.OPTIONAL + else: + raise ConformanceException('Oplist item returned non-conformance value') + return ConformanceDecision.NOT_APPLICABLE + return or_operation_inner + +# TODO: add xor operation once it's required +# TODO: how would equal and unequal operations work here? + + +def otherwise(op_list: list[Callable]) -> Callable: + def otherwise_inner(feature_map: uint, attribute_list: list[uint], all_command_list: list[uint]) -> ConformanceDecision: + # Otherwise operations apply from left to right. If any of them + # has a definite decision (optional, mandatory or disallowed), that is the one that applies + # Provisional items are meant to be marked as the first item in the list + # Deprecated items are either on their own, or follow an O as O,D. + # For O,D, optional applies (leftmost), but we should consider some way to warn here as well, + # possibly in another function + for op in op_list: + decision = op(feature_map, attribute_list, all_command_list) + if decision == ConformanceDecision.NOT_APPLICABLE: + continue + return decision + return ConformanceDecision.NOT_APPLICABLE + return otherwise_inner + + +def parse_callable_from_xml(element: ElementTree.Element, params: ConformanceParseParameters) -> Callable: + if len(list(element)) == 0: + # no subchildren here, so this can only be mandatory, optional, provisional, deprecated, disallowed, feature or attribute + if element.tag == MANDATORY_CONFORM: + return mandatory + elif element.tag == OPTIONAL_CONFORM: + return optional + elif element.tag == PROVISIONAL_CONFORM: + return provisional + elif element.tag == DEPRECATE_CONFORM: + return deprecated + elif element.tag == DISALLOW_CONFORM: + return disallowed + elif element.tag == FEATURE_TAG: + try: + return feature(params.feature_map[element.get('name')]) + except KeyError: + raise ConformanceException(f'Conformance specifies feature not in feature table: {element.get("name")}') + elif element.tag == ATTRIBUTE_TAG: + # Some command conformance tags are marked as attribute, so if this key isn't in attribute, try command + name = element.get('name') + if name in params.attribute_map: + return attribute(params.attribute_map[name]) + elif name in params.command_map: + return command(params.command_map[name]) + else: + raise ConformanceException(f'Conformance specifies attribute or command not in table: {name}') + elif element.tag == COMMAND_TAG: + return command(params.command_map[element.get('name')]) + else: + raise ConformanceException( + f'Unexpected xml conformance element with no children {str(element.tag)} {str(element.attrib)}') + + # First build the list, then create the callable for this element + ops = [] + for sub in element: + ops.append(parse_callable_from_xml(sub, params)) + + # optional can be a wrapper as well as a standalone + # This can be any of the boolean operations, optional or otherwise + if element.tag == OPTIONAL_CONFORM: + if len(ops) > 1: + raise ConformanceException(f'OPTIONAL term found with more than one subelement {list(element)}') + return optional_wrapper(ops[0]) + elif element.tag == MANDATORY_CONFORM: + if len(ops) > 1: + raise ConformanceException(f'MANDATORY term found with more than one subelement {list(element)}') + return mandatory_wrapper(ops[0]) + elif element.tag == AND_TERM: + return and_operation(ops) + elif element.tag == OR_TERM: + return or_operation(ops) + elif element.tag == NOT_TERM: + if len(ops) > 1: + raise ConformanceException(f'NOT term found with more than one subelement {list(element)}') + return not_operation(ops[0]) + elif element.tag == OTHERWISE_CONFORM: + return otherwise(ops) + else: + raise ConformanceException(f'Unexpected conformance tag with children {element}') diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index a394952445de60..398a01f6cd9d17 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -333,6 +333,19 @@ class CommandPathLocation: cluster_id: int command_id: int + +@dataclass +class ClusterPathLocation: + endpoint_id: int + cluster_id: int + + +@dataclass +class FeaturePathLocation: + endpoint_id: int + cluster_id: int + feature_code: str + # ProblemSeverity is not using StrEnum, but rather Enum, since StrEnum only # appeared in 3.11. To make it JSON serializable easily, multiple inheritance # from `str` is used. See https://stackoverflow.com/a/51976841. @@ -347,7 +360,7 @@ class ProblemSeverity(str, Enum): @dataclass class ProblemNotice: test_name: str - location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation] + location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation] severity: ProblemSeverity problem: str spec_location: str = "" @@ -551,13 +564,13 @@ async def send_single_cmd( def print_step(self, stepnum: typing.Union[int, str], title: str) -> None: logging.info(f'***** Test Step {stepnum} : {title}') - def record_error(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""): + def record_error(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.ERROR, problem, spec_location)) - def record_warning(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""): + def record_warning(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.WARNING, problem, spec_location)) - def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation], problem: str, spec_location: str = ""): + def record_note(self, test_name: str, location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation, FeaturePathLocation], problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.NOTE, problem, spec_location)) def get_setup_payload_info(self) -> SetupPayloadInfo: diff --git a/src/python_testing/spec_parsing_support.py b/src/python_testing/spec_parsing_support.py new file mode 100644 index 00000000000000..9e014aa95dd2aa --- /dev/null +++ b/src/python_testing/spec_parsing_support.py @@ -0,0 +1,343 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import glob +import logging +import os +import xml.etree.ElementTree as ElementTree +from copy import deepcopy +from dataclasses import dataclass +from enum import Enum, auto +from typing import Callable + +from chip.tlv import uint +from conformance_support import (DEPRECATE_CONFORM, DISALLOW_CONFORM, MANDATORY_CONFORM, OPTIONAL_CONFORM, OTHERWISE_CONFORM, + PROVISIONAL_CONFORM, ConformanceDecision, ConformanceException, ConformanceParseParameters, + or_operation, parse_callable_from_xml) +from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, EventPathLocation, + FeaturePathLocation, ProblemNotice, ProblemSeverity) + + +@dataclass +class XmlFeature: + code: str + name: str + conformance: Callable[[uint], ConformanceDecision] + + +@dataclass +class XmlAttribute: + name: str + datatype: str + conformance: Callable[[uint], ConformanceDecision] + + +@dataclass +class XmlCommand: + name: str + conformance: Callable[[uint], ConformanceDecision] + + +@dataclass +class XmlEvent: + name: str + conformance: Callable[[uint], ConformanceDecision] + + +@dataclass +class XmlCluster: + name: str + revision: int + derived: str + feature_map: dict[str, uint] + attribute_map: dict[str, uint] + command_map: dict[str, uint] + features: dict[str, XmlFeature] + attributes: dict[uint, XmlAttribute] + accepted_commands: dict[uint, XmlCommand] + generated_commands: dict[uint, XmlCommand] + events: dict[uint, XmlEvent] + + +class CommandType(Enum): + ACCEPTED = auto() + GENERATED = auto() + + +def has_zigbee_conformance(conformance: ElementTree.Element) -> bool: + # For clusters, things with zigbee conformance can share IDs with the matter elements, so we don't want them + + # TODO: it's actually possible for a thing to have a zigbee conformance AND to have other conformances, and we should check + # for that, but for now, this is fine because that hasn't happened in the cluster conformances YET. + # It does happen for device types, so we need to be careful there. + condition = conformance.iter('condition') + for c in condition: + try: + c.attrib['name'].lower() == "zigbee" + return True + except KeyError: + continue + return False + + +class ClusterParser: + def __init__(self, cluster, cluster_id, name): + self._problems: list[ProblemNotice] = [] + self._cluster = cluster + self._cluster_id = cluster_id + self._name = name + + self._derived = None + try: + classification = next(cluster.iter('classification')) + hierarchy = classification.attrib['hierarchy'] + if hierarchy.lower() == 'derived': + self._derived = classification.attrib['baseCluster'] + except (KeyError, StopIteration): + self._derived = None + + self.feature_elements = self.get_all_feature_elements() + self.attribute_elements = self.get_all_attribute_elements() + self.command_elements = self.get_all_command_elements() + self.event_elements = self.get_all_event_elements() + self.params = ConformanceParseParameters(feature_map=self.create_feature_map(), attribute_map=self.create_attribute_map(), + command_map=self.create_command_map()) + + def get_conformance(self, element: ElementTree.Element) -> ElementTree.Element: + for sub in element: + if sub.tag == OTHERWISE_CONFORM or sub.tag == MANDATORY_CONFORM or sub.tag == OPTIONAL_CONFORM or sub.tag == PROVISIONAL_CONFORM or sub.tag == DEPRECATE_CONFORM or sub.tag == DISALLOW_CONFORM: + return sub + + # Conformance is missing, so let's record the problem and treat it as optional for lack of a better choice + if element.tag == 'feature': + location = FeaturePathLocation(endpoint_id=0, cluster_id=self._cluster_id, feature_code=element.attrib['code']) + elif element.tag == 'command': + location = CommandPathLocation(endpoint_id=0, cluster_id=self._cluster_id, command_id=element.attrib['id']) + elif element.tag == 'attribute': + location = AttributePathLocation(endpoint_id=0, cluster_id=self._cluster_id, attribute_id=element.attrib['id']) + elif element.tag == 'event': + location = EventPathLocation(endpoint_id=0, cluster_id=self._cluster_id, event_id=element.attrib['id']) + else: + location = ClusterPathLocation(endpoing_id=0, cluster_id=self._cluster_id) + self._problems.append(ProblemNotice(test_name='Spec XML parsing', location=location, + severity=ProblemSeverity.WARNING, problem='Unable to find conformance element')) + + return ElementTree.Element(OPTIONAL_CONFORM) + + def get_all_type(self, type_container: str, type_name: str, key_name: str) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ret = [] + container_tags = self._cluster.iter(type_container) + for container in container_tags: + elements = container.iter(type_name) + for element in elements: + try: + element.attrib[key_name] + except KeyError: + # This is a conformance tag, which uses the same name + continue + conformance = self.get_conformance(element) + if has_zigbee_conformance(conformance): + continue + ret.append((element, conformance)) + return ret + + def get_all_feature_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ''' Returns a list of features and their conformances''' + return self.get_all_type('features', 'feature', 'code') + + def get_all_attribute_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ''' Returns a list of attributes and their conformances''' + return self.get_all_type('attributes', 'attribute', 'id') + + def get_all_command_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ''' Returns a list of commands and their conformances ''' + return self.get_all_type('commands', 'command', 'id') + + def get_all_event_elements(self) -> list[tuple[ElementTree.Element, ElementTree.Element]]: + ''' Returns a list of events and their conformances''' + return self.get_all_type('events', 'event', 'id') + + def create_feature_map(self) -> dict[str, uint]: + features = {} + for element, conformance in self.feature_elements: + features[element.attrib['code']] = 1 << int(element.attrib['bit'], 0) + return features + + def create_attribute_map(self) -> dict[str, uint]: + attributes = {} + for element, conformance in self.attribute_elements: + attributes[element.attrib['name']] = int(element.attrib['id'], 0) + return attributes + + def create_command_map(self) -> dict[str, uint]: + commands = {} + for element, conformance in self.command_elements: + commands[element.attrib['name']] = int(element.attrib['id'], 0) + return commands + + def parse_conformance(self, conformance_xml: ElementTree.Element) -> Callable: + try: + return parse_callable_from_xml(conformance_xml, self.params) + except ConformanceException as ex: + # Just point to the general cluster, because something is mismatched, but it's not clear what + location = ClusterPathLocation(endpoint_id=0, cluster_id=self._cluster_id) + self._problems.append(ProblemNotice(test_name='Spec XML parsing', location=location, + severity=ProblemSeverity.WARNING, problem=str(ex))) + return None + + def parse_features(self) -> dict[uint, XmlFeature]: + features = {} + for element, conformance_xml in self.feature_elements: + mask = 1 << int(element.attrib['bit'], 0) + conformance = self.parse_conformance(conformance_xml) + if conformance is None: + continue + features[mask] = XmlFeature(code=element.attrib['code'], name=element.attrib['name'], + conformance=conformance) + return features + + def parse_attributes(self) -> dict[uint, XmlAttribute]: + attributes = {} + for element, conformance_xml in self.attribute_elements: + code = int(element.attrib['id'], 0) + # Some deprecated attributes don't have their types included, for now, lets just fallback to UNKNOWN + try: + datatype = element.attrib['type'] + except KeyError: + datatype = 'UNKNOWN' + conformance = self.parse_conformance(conformance_xml) + if conformance is None: + continue + if code in attributes: + # This is one of those fun ones where two different rows have the same id and name, but differ in conformance and ranges + # I don't have a good way to relate the ranges to the conformance, but they're both acceptable, so let's just or them. + conformance = or_operation([conformance, attributes[code].conformance]) + attributes[code] = XmlAttribute(name=element.attrib['name'], datatype=datatype, + conformance=conformance) + return attributes + + def parse_commands(self, command_type: CommandType) -> dict[uint, XmlAttribute]: + commands = {} + for element, conformance_xml in self.command_elements: + code = int(element.attrib['id'], 0) + dir = CommandType.ACCEPTED + try: + if element.attrib['direction'].lower() == 'responsefromserver': + dir = CommandType.GENERATED + except KeyError: + pass + if dir != command_type: + continue + code = int(element.attrib['id'], 0) + conformance = self.parse_conformance(conformance_xml) + if conformance is None: + continue + if code in commands: + conformance = or_operation([conformance, commands[code].conformance]) + commands[code] = XmlCommand(name=element.attrib['name'], conformance=conformance) + return commands + + def parse_events(self) -> dict[uint, XmlAttribute]: + events = {} + for element, conformance_xml in self.event_elements: + code = int(element.attrib['id'], 0) + conformance = self.parse_conformance(conformance_xml) + if conformance is None: + continue + if code in events: + conformance = or_operation([conformance, events[code].conformance]) + events[code] = XmlEvent(name=element.attrib['name'], conformance=conformance) + return events + + def create_cluster(self) -> XmlCluster: + return XmlCluster(revision=self._cluster.attrib['revision'], derived=self._derived, + name=self._name, feature_map=self.params.feature_map, + attribute_map=self.params.attribute_map, command_map=self.params.command_map, + features=self.parse_features(), + attributes=self.parse_attributes(), + accepted_commands=self.parse_commands(CommandType.ACCEPTED), + generated_commands=self.parse_commands(CommandType.GENERATED), + events=self.parse_events()) + + def get_problems(self) -> list[ProblemNotice]: + return self._problems + + +def build_xml_clusters() -> tuple[list[XmlCluster], list[ProblemNotice]]: + dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', 'data_model', 'clusters') + clusters: dict[int, XmlCluster] = {} + derived_clusters: dict[str, XmlCluster] = {} + ids_by_name = {} + problems = [] + for xml in glob.glob(f"{dir}/*.xml"): + logging.info(f'Parsing file {xml}') + tree = ElementTree.parse(f'{xml}') + root = tree.getroot() + cluster = root.iter('cluster') + for c in cluster: + name = c.attrib['name'] + if not c.attrib['id']: + # Fully derived clusters have no id, but also shouldn't appear on a device. + # We do need to keep them, though, because we need to update the derived + # clusters. We keep them in a special dict by name, so they can be thrown + # away later. + cluster_id = None + else: + cluster_id = int(c.attrib['id'], 0) + ids_by_name[name] = cluster_id + + parser = ClusterParser(c, cluster_id, name) + new = parser.create_cluster() + problems = problems + parser.get_problems() + + if cluster_id: + clusters[cluster_id] = new + else: + derived_clusters[name] = new + + # We have the information now about which clusters are derived, so we need to fix them up. Apply first the base cluster, + # then add the specific cluster overtop + for id, c in clusters.items(): + if c.derived: + base_name = c.derived + if base_name in ids_by_name: + base = clusters[ids_by_name[c.derived]] + else: + base = derived_clusters[base_name] + + feature_map = deepcopy(base.feature_map) + feature_map.update(c.feature_map) + attribute_map = deepcopy(base.attribute_map) + attribute_map.update(c.attribute_map) + command_map = deepcopy(base.command_map) + command_map.update(c.command_map) + features = deepcopy(base.features) + features.update(c.features) + attributes = deepcopy(base.attributes) + attributes.update(c.attributes) + accepted_commands = deepcopy(base.accepted_commands) + accepted_commands.update(c.accepted_commands) + generated_commands = deepcopy(base.generated_commands) + generated_commands.update(c.generated_commands) + events = deepcopy(base.events) + events.update(c.events) + new = XmlCluster(revision=c.revision, derived=c.derived, name=c.name, + feature_map=feature_map, attribute_map=attribute_map, command_map=command_map, + features=features, attributes=attributes, accepted_commands=accepted_commands, + generated_commands=generated_commands, events=events) + clusters[id] = new + return clusters, problems