Skip to content

Commit

Permalink
[Python] Convert tests in src/python_testing/ to asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
agners committed Jun 18, 2024
1 parent ff8190d commit af74080
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 39 deletions.
4 changes: 2 additions & 2 deletions src/python_testing/TC_ACE_1_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ async def test_TC_ACE_1_5(self):
self.th2 = new_fabric_admin.NewController(nodeId=TH2_nodeid,
paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path))

params = self.openCommissioningWindow(self.th1, self.dut_node_id)
params = await self.openCommissioningWindow(self.th1, self.dut_node_id)
self.print_step(2, "TH1 opens the commissioning window on the DUT")

self.th2.CommissionOnNetwork(
await self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.commissioningParameters.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=params.randomDiscriminator)
logging.info('Commissioning complete done. Successful.')
Expand Down
8 changes: 4 additions & 4 deletions src/python_testing/TC_CGEN_2_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@

class TC_CGEN_2_4(MatterBaseTest):

def OpenCommissioningWindow(self) -> CommissioningParameters:
async def OpenCommissioningWindow(self) -> CommissioningParameters:
try:
params = self.th1.OpenCommissioningWindow(
params = await self.th1.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=self.discriminator, option=1)
return params

Expand All @@ -56,7 +56,7 @@ async def CommissionToStageSendCompleteAndCleanup(
self, stage: int, expectedErrorPart: chip.native.ErrorSDKPart, expectedErrCode: int):

logging.info("-----------------Fail on step {}-------------------------".format(stage))
params = self.OpenCommissioningWindow()
params = await self.OpenCommissioningWindow()
self.th2.ResetTestCommissioner()
# This will run the commissioning up to the point where stage x is run and the
# response is sent before the test commissioner simulates a failure
Expand Down Expand Up @@ -99,7 +99,7 @@ async def test_TC_CGEN_2_4(self):
await self.CommissionToStageSendCompleteAndCleanup(kSendNOC, chip.native.ErrorSDKPart.IM_CLUSTER_STATUS, 0x02)

logging.info('Step 15 - TH1 opens a commissioning window')
params = self.OpenCommissioningWindow()
params = await self.OpenCommissioningWindow()

logging.info('Step 16 - TH2 fully commissions the DUT')
self.th2.ResetTestCommissioner()
Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/TC_DA_1_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ async def test_TC_DA_1_5(self):
await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1))

self.print_step(13, "Open commissioning window")
params = self.default_controller.OpenCommissioningWindow(
params = await self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1)

self.print_step(14, "Commission to TH2")
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
TH2 = new_fabric_admin.NewController(nodeId=112233)

TH2.CommissionOnNetwork(
await TH2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)

Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/TC_IDM_1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ async def test_TC_IDM_1_2(self):
device = next(filter(lambda d: d.commissioningMode == 2 and d.longDiscriminator == discriminator, devices))
for a in device.addresses:
try:
TH2.EstablishPASESessionIP(ipaddr=a, setupPinCode=params.setupPinCode,
nodeid=self.dut_node_id+1, port=device.port)
await TH2.EstablishPASESessionIP(ipaddr=a, setupPinCode=params.setupPinCode,
nodeid=self.dut_node_id+1, port=device.port)
break
except ChipStackError:
continue
Expand Down
26 changes: 13 additions & 13 deletions src/python_testing/TC_OPCREDS_3_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def FindAndEstablishPase(self, longDiscriminator: int, setupPinCode: int, nodeid
Discovery.FilterType.LONG_DISCRIMINATOR and d.longDiscriminator == longDiscriminator, devices))
for a in device.addresses:
try:
dev_ctrl.EstablishPASESessionIP(ipaddr=a, setupPinCode=setupPinCode,
nodeid=nodeid, port=device.port)
await dev_ctrl.EstablishPASESessionIP(ipaddr=a, setupPinCode=setupPinCode,
nodeid=nodeid, port=device.port)
break
except ChipStackError:
continue
Expand All @@ -51,11 +51,11 @@ def FindAndEstablishPase(self, longDiscriminator: int, setupPinCode: int, nodeid
except TimeoutError:
asserts.fail("Unable to establish a PASE session to the device")

def OpenCommissioningWindow(self, dev_ctrl: ChipDeviceCtrl, node_id: int):
async def OpenCommissioningWindow(self, dev_ctrl: ChipDeviceCtrl, node_id: int):
# TODO: abstract this in the base layer? Do we do this a lot?
longDiscriminator = random.randint(0, 4095)
try:
params = dev_ctrl.OpenCommissioningWindow(
params = await dev_ctrl.OpenCommissioningWindow(
nodeid=node_id, timeout=600, iteration=10000, discriminator=longDiscriminator, option=ChipDeviceCtrl.ChipDeviceControllerBase.CommissioningWindowPasscode.kTokenWithRandomPin)
except Exception as e:
logging.exception('Error running OpenCommissioningWindow %s', e)
Expand Down Expand Up @@ -85,7 +85,7 @@ async def test_TC_OPCREDS_3_1(self):
"Device fabric table is full - please remove one fabric and retry")

self.print_step(1, "TH0 opens a commissioning window on the DUT")
longDiscriminator, params = self.OpenCommissioningWindow(self.default_controller, self.dut_node_id)
longDiscriminator, params = await self.OpenCommissioningWindow(self.default_controller, self.dut_node_id)

self.print_step(
2, "TH0 reads the BasicCommissioningInfo field from the General commissioning cluster saves MaxCumulativeFailsafeSeconds as `failsafe_max`")
Expand All @@ -109,7 +109,7 @@ async def test_TC_OPCREDS_3_1(self):
self.print_step(6, "TH1 obtains or generates the NOC, the Root CA Certificate and ICAC using csrResponse and selects an IPK. The certificates shall have their subjects padded with additional data such that they are each the maximum certificate size of 400 bytes when encoded in the MatterCertificateEncoding.")
# Our CA is set up to maximize cert chains already
# Extract the RCAC public key and save as `Root_Public_Key_TH1`
TH1_certs_real = TH1.IssueNOCChain(csrResponse, newNodeId)
TH1_certs_real = await TH1.IssueNOCChain(csrResponse, newNodeId)
if (TH1_certs_real.rcacBytes is None or
TH1_certs_real.icacBytes is None or
TH1_certs_real.nocBytes is None or TH1_certs_real.ipkBytes is None):
Expand All @@ -125,7 +125,7 @@ async def test_TC_OPCREDS_3_1(self):
TH1_CA_fake = self.certificate_authority_manager.NewCertificateAuthority()
TH1_fabric_admin_fake = TH1_CA_fake.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
TH1_fake = TH1_fabric_admin_fake.NewController(nodeId=self.default_controller.nodeId)
TH1_certs_fake = TH1_fake.IssueNOCChain(csrResponse, newNodeId)
TH1_certs_fake = await TH1_fake.IssueNOCChain(csrResponse, newNodeId)
if (TH1_certs_fake.rcacBytes is None or
TH1_certs_fake.icacBytes is None or
TH1_certs_fake.nocBytes is None or TH1_certs_real.ipkBytes is None):
Expand Down Expand Up @@ -361,7 +361,7 @@ async def test_TC_OPCREDS_3_1(self):
asserts.assert_equal(len(fabrics), fabrics_original_size, "Fabric list size does not match original")

self.print_step(37, "TH1 fully commissions DUT onto the fabric using a set of valid certificates")
TH1.Commission(newNodeId)
await TH1.Commission(newNodeId)

self.print_step(
38, "TH1 reads the TrustedRootCertificates list from DUT and verify that there are trusted_root_original_size + 1 entries")
Expand Down Expand Up @@ -404,7 +404,7 @@ async def test_TC_OPCREDS_3_1(self):
resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk, "Failure on UpdateFabricLabel")

self.print_step(44, "TH1 sends an OpenCommissioningWindow command to the Administrator Commissioning cluster")
longDiscriminator, params = self.OpenCommissioningWindow(TH1, newNodeId)
longDiscriminator, params = await self.OpenCommissioningWindow(TH1, newNodeId)

self.print_step(45, "TH2 commissions the DUT")
TH2_CA = self.certificate_authority_manager.NewCertificateAuthority(maximizeCertChains=True)
Expand All @@ -413,7 +413,7 @@ async def test_TC_OPCREDS_3_1(self):
TH2_nodeid = self.default_controller.nodeId+2
TH2 = TH2_fabric_admin.NewController(nodeId=TH2_nodeid)
TH2_dut_nodeid = self.dut_node_id+2
TH2.CommissionOnNetwork(
await TH2.CommissionOnNetwork(
nodeId=TH2_dut_nodeid, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=longDiscriminator)

Expand Down Expand Up @@ -484,7 +484,7 @@ async def test_TC_OPCREDS_3_1(self):
temp_CA = self.certificate_authority_manager.NewCertificateAuthority()
temp_fabric_admin = temp_CA.NewFabricAdmin(vendorId=0xFFF1, fabricId=3)
temp_controller = temp_fabric_admin.NewController(nodeId=self.default_controller.nodeId)
temp_certs = temp_controller.IssueNOCChain(csrResponse, newNodeId)
temp_certs = await temp_controller.IssueNOCChain(csrResponse, newNodeId)
if (temp_certs.rcacBytes is None or
temp_certs.icacBytes is None or
temp_certs.nocBytes is None or temp_certs.ipkBytes is None):
Expand Down Expand Up @@ -521,7 +521,7 @@ async def test_TC_OPCREDS_3_1(self):

self.print_step(61, "TH1 obtains or generates a NOC and ICAC using the CSR elements from the previous step with a different NodeID, but the same Root CA Certificate and fabric ID as step <<TH1-gen-real-creds>>. Save as `Node_Operational_Certificates_TH1_fabric_conflict` and `Intermediate_Certificate_TH1_fabric_conflict`|")
anotherNodeId = newNodeId + 1
TH1_certs_fabric_conflict = TH1.IssueNOCChain(csrResponse_new, anotherNodeId)
TH1_certs_fabric_conflict = await TH1.IssueNOCChain(csrResponse_new, anotherNodeId)
if (TH1_certs_fabric_conflict.rcacBytes is None or
TH1_certs_fabric_conflict.icacBytes is None or
TH1_certs_fabric_conflict.nocBytes is None or TH1_certs_fabric_conflict.ipkBytes is None):
Expand Down Expand Up @@ -565,7 +565,7 @@ async def test_TC_OPCREDS_3_1(self):
"Unexpected response type for UpdateNOC csr request")

self.print_step(68, "TH1 obtains or generates a NOC, Root CA Certificate, ICAC using the CSR elements from the previous step")
TH1_certs_3 = TH1.IssueNOCChain(csrResponse, anotherNodeId)
TH1_certs_3 = await TH1.IssueNOCChain(csrResponse, anotherNodeId)
if (TH1_certs_3.rcacBytes is None or
TH1_certs_3.icacBytes is None or
TH1_certs_3.nocBytes is None or TH1_certs_3.ipkBytes is None):
Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/TC_TIMESYNC_2_13.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ async def test_TC_TIMESYNC_2_13(self):
self.print_step(0, "Commissioning, already done")

self.print_step(1, "TH1 opens a commissioning window")
params = self.default_controller.OpenCommissioningWindow(
params = await self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1)

self.print_step(2, "Commission to TH2")
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
TH2 = new_fabric_admin.NewController(nodeId=112233)

TH2.CommissionOnNetwork(
await TH2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)

Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/TestCommissioningTimeSync.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ async def teardown_test(self):
return super().teardown_test()

async def commission_and_base_checks(self):
params = self.default_controller.OpenCommissioningWindow(
params = await self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1)
self.commissioner.CommissionOnNetwork(
await self.commissioner.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)
self.commissioned = True
Expand Down
6 changes: 3 additions & 3 deletions src/python_testing/basic_composition_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ def ConvertValue(value) -> Any:


class BasicCompositionTests:
def connect_over_pase(self, dev_ctrl):
async def connect_over_pase(self, dev_ctrl):
setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code
asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.")
dev_ctrl.FindOrEstablishPASESession(setupCode, self.dut_node_id)
await dev_ctrl.FindOrEstablishPASESession(setupCode, self.dut_node_id)

def dump_wildcard(self, dump_device_composition_path: typing.Optional[str]):
node_dump_dict = {endpoint_id: MatterTlvToJson(self.endpoints_tlv[endpoint_id]) for endpoint_id in self.endpoints_tlv}
Expand All @@ -121,7 +121,7 @@ async def setup_class_helper(self, default_to_pase: bool = True):
dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None)

if do_test_over_pase:
self.connect_over_pase(dev_ctrl)
await self.connect_over_pase(dev_ctrl)
node_id = self.dut_node_id
else:
# Using the already commissioned node
Expand Down
18 changes: 9 additions & 9 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,11 @@ def check_pics(self, pics_key: str) -> bool:
pics_key = pics_key.strip()
return pics_key in picsd and picsd[pics_key]

def openCommissioningWindow(self, dev_ctrl: ChipDeviceCtrl, node_id: int) -> CustomCommissioningParameters:
async def openCommissioningWindow(self, dev_ctrl: ChipDeviceCtrl, node_id: int) -> CustomCommissioningParameters:
rnd_discriminator = random.randint(0, 4095)
try:
commissioning_params = dev_ctrl.OpenCommissioningWindow(nodeid=node_id, timeout=900, iteration=1000,
discriminator=rnd_discriminator, option=1)
commissioning_params = await dev_ctrl.OpenCommissioningWindow(nodeid=node_id, timeout=900, iteration=1000,
discriminator=rnd_discriminator, option=1)
params = CustomCommissioningParameters(commissioning_params, rnd_discriminator)
return params

Expand Down Expand Up @@ -1524,10 +1524,10 @@ def test_run_commissioning(self):
(conf.root_of_trust_index, conf.fabric_id, node_id))
logging.info("Commissioning method: %s" % conf.commissioning_method)

if not self._commission_device(commission_idx):
if not asyncio.run(self._commission_device(commission_idx)):
raise signals.TestAbortAll("Failed to commission node")

def _commission_device(self, i) -> bool:
async def _commission_device(self, i) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config

Expand All @@ -1543,7 +1543,7 @@ def _commission_device(self, i) -> bool:

if conf.commissioning_method == "on-network":
try:
dev_ctrl.CommissionOnNetwork(
await dev_ctrl.CommissionOnNetwork(
nodeId=conf.dut_node_ids[i],
setupPinCode=info.passcode,
filterType=info.filter_type,
Expand All @@ -1555,7 +1555,7 @@ def _commission_device(self, i) -> bool:
return False
elif conf.commissioning_method == "ble-wifi":
try:
dev_ctrl.CommissionWiFi(
await dev_ctrl.CommissionWiFi(
info.filter_value,
info.passcode,
conf.dut_node_ids[i],
Expand All @@ -1569,7 +1569,7 @@ def _commission_device(self, i) -> bool:
return False
elif conf.commissioning_method == "ble-thread":
try:
dev_ctrl.CommissionThread(
await dev_ctrl.CommissionThread(
info.filter_value,
info.passcode,
conf.dut_node_ids[i],
Expand All @@ -1583,7 +1583,7 @@ def _commission_device(self, i) -> bool:
elif conf.commissioning_method == "on-network-ip":
try:
logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====")
dev_ctrl.CommissionIP(
await dev_ctrl.CommissionIP(
ipaddr=conf.commissionee_ip_address_just_for_testing,
setupPinCode=info.passcode, nodeid=conf.dut_node_ids[i]
)
Expand Down

0 comments on commit af74080

Please sign in to comment.