From 32fbd28d87f8784e3134c97ca2b8bf02098950f9 Mon Sep 17 00:00:00 2001 From: C Freeman Date: Mon, 3 Apr 2023 18:11:22 -0400 Subject: [PATCH] Add support for > 1 DUT for TC-DA-1.7 (#25928) Allows the user to specify multiple DUTs by using a list of discriminators and passcodes on the command line. Forces the use of 2 DUTS for this test and compares the PKs. --- src/python_testing/TC_DA_1_7.py | 77 ++++++++++++-------- src/python_testing/matter_testing_support.py | 73 ++++++++++++------- 2 files changed, 96 insertions(+), 54 deletions(-) diff --git a/src/python_testing/TC_DA_1_7.py b/src/python_testing/TC_DA_1_7.py index 4b4dd66aa65866..35eae6a29db700 100644 --- a/src/python_testing/TC_DA_1_7.py +++ b/src/python_testing/TC_DA_1_7.py @@ -66,6 +66,21 @@ def extract_akid(cert: Certificate) -> Optional[bytes]: class TC_DA_1_7(MatterBaseTest): @async_test_body async def test_TC_DA_1_7(self): + # For real tests, we require more than one DUT + # On the CI, this doesn't make sense to do since all the examples use the same DAC + # To specify more than 1 DUT, use a list of discriminators and passcodes + allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) + if allow_sdk_dac: + asserts.assert_equal(len(self.matter_test_config.discriminator), 1, "Only one device can be tested with SDK DAC") + if not allow_sdk_dac: + asserts.assert_equal(len(self.matter_test_config.discriminator), 2, "This test requires 2 DUTs") + pk = [] + for i in range(len(self.matter_test_config.dut_node_id)): + pk.append(await self.single_DUT(i, self.matter_test_config.dut_node_id[i])) + + asserts.assert_equal(len(pk), len(set(pk)), "Found matching public keys in different DUTs") + + async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes: # Option to allow SDK roots (skip step 4 check 2) allow_sdk_dac = self.user_params.get("allow_sdk_dac", False) @@ -74,52 +89,56 @@ async def test_TC_DA_1_7(self): paa_by_skid = load_all_paa(conf.paa_trust_store_path) logging.info("Found %d PAAs" % len(paa_by_skid)) - logging.info("Step 1: Commissioning, already done") + logging.info("DUT {} Step 1: Commissioning, already done".format(dut_index)) dev_ctrl = self.default_controller - logging.info("Step 2: Get PAI of DUT1 with certificate chain request") - result = await dev_ctrl.SendCommand(self.dut_node_id, 0, + logging.info("DUT {} Step 2: Get PAI of DUT1 with certificate chain request".format(dut_index)) + result = await dev_ctrl.SendCommand(dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(2)) - pai_1 = result.certificate - asserts.assert_less_equal(len(pai_1), 600, "PAI cert must be at most 600 bytes") - self.record_data({"pai_1": hex_from_bytes(pai_1)}) + pai = result.certificate + asserts.assert_less_equal(len(pai), 600, "PAI cert must be at most 600 bytes") + key = 'pai_{}'.format(dut_index) + self.record_data({key: hex_from_bytes(pai)}) - logging.info("Step 3: Get DAC of DUT1 with certificate chain request") - result = await dev_ctrl.SendCommand(self.dut_node_id, 0, + logging.info("DUT {} Step 3: Get DAC of DUT1 with certificate chain request".format(dut_index)) + result = await dev_ctrl.SendCommand(dut_node_id, 0, Clusters.OperationalCredentials.Commands.CertificateChainRequest(1)) - dac_1 = result.certificate - asserts.assert_less_equal(len(dac_1), 600, "DAC cert must be at most 600 bytes") - self.record_data({"dac_1": hex_from_bytes(dac_1)}) - - logging.info("Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid") - pai1_cert = load_der_x509_certificate(pai_1) - pai1_akid = extract_akid(pai1_cert) - if pai1_akid not in paa_by_skid: - asserts.fail("DUT1's PAI (%s) not matched in PAA trust store" % hex_from_bytes(pai1_akid)) - - filename, paa_cert = paa_by_skid[pai1_akid] + dac = result.certificate + asserts.assert_less_equal(len(dac), 600, "DAC cert must be at most 600 bytes") + key = 'dac_{}'.format(dut_index) + self.record_data({key: hex_from_bytes(dac)}) + + logging.info("DUT {} Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid".format(dut_index)) + pai_cert = load_der_x509_certificate(pai) + pai_akid = extract_akid(pai_cert) + if pai_akid not in paa_by_skid: + asserts.fail("DUT %d PAI (%s) not matched in PAA trust store" % (dut_index, hex_from_bytes(pai_akid))) + + filename, paa_cert = paa_by_skid[pai_akid] logging.info("Matched PAA file %s, subject: %s" % (filename, paa_cert.subject)) public_key = paa_cert.public_key() try: - public_key.verify(signature=pai1_cert.signature, data=pai1_cert.tbs_certificate_bytes, + public_key.verify(signature=pai_cert.signature, data=pai_cert.tbs_certificate_bytes, signature_algorithm=ec.ECDSA(hashes.SHA256())) except InvalidSignature as e: - asserts.fail("Failed to verify PAI signature against PAA public key: %s" % str(e)) + asserts.fail("DUT %d: Failed to verify PAI signature against PAA public key: %s" % (dut_index, str(e))) logging.info("Validated PAI signature against PAA") - logging.info("Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs") + logging.info("DUT {} Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs".format(dut_index)) if allow_sdk_dac: logging.warn("===> TEST STEP SKIPPED: Allowing SDK DACs!") else: for candidate in FORBIDDEN_AKID: - asserts.assert_not_equal(hex_from_bytes(pai1_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist") - - logging.info("Step 5: Extract subject public key of DAC and save") - dac1_cert = load_der_x509_certificate(dac_1) - pk_1 = dac1_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint) - logging.info("Subject public key pk_1: %s" % hex_from_bytes(pk_1)) - self.record_data({"pk_1": hex_from_bytes(pk_1)}) + asserts.assert_not_equal(hex_from_bytes(pai_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist") + + logging.info("DUT {} Step 5: Extract subject public key of DAC and save".format(dut_index)) + dac_cert = load_der_x509_certificate(dac) + pk = dac_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint) + logging.info("Subject public key pk: %s" % hex_from_bytes(pk)) + key = 'pk_{}'.format(dut_index) + self.record_data({key: hex_from_bytes(pk)}) + return pk if __name__ == "__main__": diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 46c205ed0b3cc3..c4a73c0179d33b 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -132,8 +132,8 @@ class MatterTestConfig: tests: List[str] = field(default_factory=list) commissioning_method: str = None - discriminator: int = None - setup_passcode: int = None + discriminator: List[int] = None + setup_passcode: List[int] = None commissionee_ip_address_just_for_testing: str = None maximize_cert_chains: bool = False @@ -145,7 +145,7 @@ class MatterTestConfig: thread_operational_dataset: str = None # Node ID for basic DUT - dut_node_id: int = _DEFAULT_DUT_NODE_ID + dut_node_id: List[int] = None # Node ID to use for controller/commissioner controller_node_id: int = _DEFAULT_CONTROLLER_NODE_ID # CAT Tags for default controller/commissioner @@ -264,7 +264,7 @@ def certificate_authority_manager(self) -> chip.CertificateAuthority.Certificate @property def dut_node_id(self) -> int: - return self.matter_test_config.dut_node_id + return self.matter_test_config.dut_node_id[0] async def read_single_attribute( self, dev_ctrl: ChipDeviceCtrl, node_id: int, endpoint: int, attribute: object, fabricFiltered: bool = True) -> object: @@ -489,6 +489,28 @@ def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConf print("error: Cannot have both --qr-code and --manual-code present!") return False + if len(config.discriminator) != len(config.setup_passcode): + print("error: supplied number of discriminators does not match number of passcodes") + return False + + if len(config.dut_node_id) > len(config.discriminator): + print("error: More node IDs provided than discriminators") + return False + + if len(config.dut_node_id) < len(config.discriminator): + missing = len(config.discriminator) - len(config.dut_node_id) + for i in range(missing): + config.dut_node_id.append(config.dut_node_id[-1] + 1) + + if len(config.dut_node_id) != len(set(config.dut_node_id)): + print("error: Duplicate values in node id list") + return False + + if len(config.discriminator) != len(set(config.discriminator)): + print("error: Duplicate value in discriminator list") + return False + + # TODO: this should also allow multiple once QR and manual codes are supported. config.qr_code_content = args.qr_code config.manual_code = args.manual_code @@ -591,9 +613,9 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig: default=_DEFAULT_CONTROLLER_NODE_ID, help='NodeID to use for initial/default controller (default: %d)' % _DEFAULT_CONTROLLER_NODE_ID) basic_group.add_argument('-n', '--dut-node-id', type=int_decimal_or_hex, - metavar='NODE_ID', default=_DEFAULT_DUT_NODE_ID, + metavar='NODE_ID', default=[_DEFAULT_DUT_NODE_ID], help='Node ID for primary DUT communication, ' - 'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID) + 'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID, nargs="+") commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node") @@ -603,13 +625,13 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig: help='Name of commissioning method to use') commission_group.add_argument('-d', '--discriminator', type=int_decimal_or_hex, metavar='LONG_DISCRIMINATOR', - help='Discriminator to use for commissioning') + help='Discriminator to use for commissioning', nargs="+") commission_group.add_argument('-p', '--passcode', type=int_decimal_or_hex, metavar='PASSCODE', - help='PAKE passcode to use') + help='PAKE passcode to use', nargs="+") commission_group.add_argument('-i', '--ip-addr', type=str, metavar='RAW_IP_ADDRESS', - help='IP address to use (only for method "on-network-ip". ONLY FOR LOCAL TESTING!') + help='IP address to use (only for method "on-network-ip". ONLY FOR LOCAL TESTING!', nargs="+") commission_group.add_argument('--wifi-ssid', type=str, metavar='SSID', @@ -692,14 +714,15 @@ class CommissionDeviceTest(MatterBaseTest): def test_run_commissioning(self): conf = self.matter_test_config - logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" % - (conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id)) - logging.info("Commissioning method: %s" % conf.commissioning_method) + for i in range(len(conf.dut_node_id)): + logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" % + (conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id[i])) + logging.info("Commissioning method: %s" % conf.commissioning_method) - if not self._commission_device(): - raise signals.TestAbortAll("Failed to commission node") + if not self._commission_device(i): + raise signals.TestAbortAll("Failed to commission node") - def _commission_device(self) -> bool: + def _commission_device(self, i) -> bool: dev_ctrl = self.default_controller conf = self.matter_test_config @@ -707,31 +730,31 @@ def _commission_device(self) -> bool: if conf.commissioning_method == "on-network": return dev_ctrl.CommissionOnNetwork( - nodeId=conf.dut_node_id, - setupPinCode=conf.setup_passcode, + nodeId=conf.dut_node_id[i], + setupPinCode=conf.setup_passcode[i], filterType=DiscoveryFilterType.LONG_DISCRIMINATOR, - filter=conf.discriminator + filter=conf.discriminator[i] ) elif conf.commissioning_method == "ble-wifi": return dev_ctrl.CommissionWiFi( - conf.discriminator, - conf.setup_passcode, - conf.dut_node_id, + conf.discriminator[i], + conf.setup_passcode[i], + conf.dut_node_id[i], conf.wifi_ssid, conf.wifi_passphrase ) elif conf.commissioning_method == "ble-thread": return dev_ctrl.CommissionThread( - conf.discriminator, - conf.setup_passcode, - conf.dut_node_id, + conf.discriminator[i], + conf.setup_passcode[i], + conf.dut_node_id[i], conf.thread_operational_dataset ) elif conf.commissioning_method == "on-network-ip": logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====") return dev_ctrl.CommissionIP( ipaddr=conf.commissionee_ip_address_just_for_testing, - setupPinCode=conf.setup_passcode, nodeid=conf.dut_node_id + setupPinCode=conf.setup_passcode[i], nodeid=conf.dut_node_id[i] ) else: raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)