Skip to content

Commit

Permalink
[Python] Convert tests in src/python_testing/ to asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
agners committed Jun 19, 2024
1 parent a4bb261 commit 6958c82
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 47 deletions.
4 changes: 2 additions & 2 deletions src/python_testing/TC_ACE_1_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ async def test_TC_ACE_1_5(self):
self.th2 = new_fabric_admin.NewController(nodeId=TH2_nodeid,
paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path))

params = self.openCommissioningWindow(self.th1, self.dut_node_id)
params = await self.openCommissioningWindow(self.th1, self.dut_node_id)
self.print_step(2, "TH1 opens the commissioning window on the DUT")

self.th2.CommissionOnNetwork(
await self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.commissioningParameters.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=params.randomDiscriminator)
logging.info('Commissioning complete done. Successful.')
Expand Down
12 changes: 6 additions & 6 deletions src/python_testing/TC_CGEN_2_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@

class TC_CGEN_2_4(MatterBaseTest):

def OpenCommissioningWindow(self) -> CommissioningParameters:
async def OpenCommissioningWindow(self) -> CommissioningParameters:
try:
params = self.th1.OpenCommissioningWindow(
params = await self.th1.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=self.discriminator, option=1)
return params

Expand All @@ -56,14 +56,14 @@ async def CommissionToStageSendCompleteAndCleanup(
self, stage: int, expectedErrorPart: chip.native.ErrorSDKPart, expectedErrCode: int):

logging.info("-----------------Fail on step {}-------------------------".format(stage))
params = self.OpenCommissioningWindow()
params = await self.OpenCommissioningWindow()
self.th2.ResetTestCommissioner()
# This will run the commissioning up to the point where stage x is run and the
# response is sent before the test commissioner simulates a failure
self.th2.SetTestCommissionerPrematureCompleteAfter(stage)
ctx = asserts.assert_raises(ChipStackError)
with ctx:
self.th2.CommissionOnNetwork(
await self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.discriminator)
errcode = ctx.exception.chip_error
Expand Down Expand Up @@ -99,12 +99,12 @@ async def test_TC_CGEN_2_4(self):
await self.CommissionToStageSendCompleteAndCleanup(kSendNOC, chip.native.ErrorSDKPart.IM_CLUSTER_STATUS, 0x02)

logging.info('Step 15 - TH1 opens a commissioning window')
params = self.OpenCommissioningWindow()
params = await self.OpenCommissioningWindow()

logging.info('Step 16 - TH2 fully commissions the DUT')
self.th2.ResetTestCommissioner()

self.th2.CommissionOnNetwork(
await self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.discriminator)
logging.info('Commissioning complete done.')
Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/TC_DA_1_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ async def test_TC_DA_1_5(self):
await self.send_single_cmd(cmd=gcomm.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1))

self.print_step(13, "Open commissioning window")
params = self.default_controller.OpenCommissioningWindow(
params = await self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1)

self.print_step(14, "Commission to TH2")
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
TH2 = new_fabric_admin.NewController(nodeId=112233)

TH2.CommissionOnNetwork(
await TH2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)

Expand Down
10 changes: 5 additions & 5 deletions src/python_testing/TC_IDM_1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async def test_TC_IDM_1_2(self):
# To get a PASE session, we need an open commissioning window
discriminator = random.randint(0, 4095)

params = self.default_controller.OpenCommissioningWindow(
params = await self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=discriminator, option=1)

# TH2 = new controller that's not connected over CASE
Expand All @@ -201,14 +201,14 @@ async def test_TC_IDM_1_2(self):
device = next(filter(lambda d: d.commissioningMode == 2 and d.longDiscriminator == discriminator, devices))
for a in device.addresses:
try:
TH2.EstablishPASESessionIP(ipaddr=a, setupPinCode=params.setupPinCode,
nodeid=self.dut_node_id+1, port=device.port)
await TH2.EstablishPASESessionIP(ipaddr=a, setupPinCode=params.setupPinCode,
nodeid=self.dut_node_id+1, port=device.port)
break
except ChipStackError:
continue

try:
TH2.GetConnectedDeviceSync(nodeid=self.dut_node_id+1, allowPASE=True, timeoutMs=1000)
await TH2.GetConnectedDevice(nodeid=self.dut_node_id+1, allowPASE=True, timeoutMs=1000)
except TimeoutError:
asserts.fail("Unable to establish a PASE session to the device")

Expand Down Expand Up @@ -263,7 +263,7 @@ async def test_TC_IDM_1_2(self):

# Try with RevokeCommissioning
# First open a commissioning window for us to revoke, so we know this command is able to succeed absent this error
_ = self.default_controller.OpenCommissioningWindow(
_ = await self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=discriminator, option=1)
cmd = FakeRevokeCommissioning()
try:
Expand Down
32 changes: 16 additions & 16 deletions src/python_testing/TC_OPCREDS_3_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


class TC_OPCREDS_3_1(MatterBaseTest):
def FindAndEstablishPase(self, longDiscriminator: int, setupPinCode: int, nodeid: int, dev_ctrl: ChipDeviceCtrl = None):
async def FindAndEstablishPase(self, longDiscriminator: int, setupPinCode: int, nodeid: int, dev_ctrl: ChipDeviceCtrl = None):
if dev_ctrl is None:
dev_ctrl = self.default_controller

Expand All @@ -41,8 +41,8 @@ def FindAndEstablishPase(self, longDiscriminator: int, setupPinCode: int, nodeid
Discovery.FilterType.LONG_DISCRIMINATOR and d.longDiscriminator == longDiscriminator, devices))
for a in device.addresses:
try:
dev_ctrl.EstablishPASESessionIP(ipaddr=a, setupPinCode=setupPinCode,
nodeid=nodeid, port=device.port)
await dev_ctrl.EstablishPASESessionIP(ipaddr=a, setupPinCode=setupPinCode,
nodeid=nodeid, port=device.port)
break
except ChipStackError:
continue
Expand All @@ -51,11 +51,11 @@ def FindAndEstablishPase(self, longDiscriminator: int, setupPinCode: int, nodeid
except TimeoutError:
asserts.fail("Unable to establish a PASE session to the device")

def OpenCommissioningWindow(self, dev_ctrl: ChipDeviceCtrl, node_id: int):
async def OpenCommissioningWindow(self, dev_ctrl: ChipDeviceCtrl, node_id: int):
# TODO: abstract this in the base layer? Do we do this a lot?
longDiscriminator = random.randint(0, 4095)
try:
params = dev_ctrl.OpenCommissioningWindow(
params = await dev_ctrl.OpenCommissioningWindow(
nodeid=node_id, timeout=600, iteration=10000, discriminator=longDiscriminator, option=ChipDeviceCtrl.ChipDeviceControllerBase.CommissioningWindowPasscode.kTokenWithRandomPin)
except Exception as e:
logging.exception('Error running OpenCommissioningWindow %s', e)
Expand Down Expand Up @@ -85,7 +85,7 @@ async def test_TC_OPCREDS_3_1(self):
"Device fabric table is full - please remove one fabric and retry")

self.print_step(1, "TH0 opens a commissioning window on the DUT")
longDiscriminator, params = self.OpenCommissioningWindow(self.default_controller, self.dut_node_id)
longDiscriminator, params = await self.OpenCommissioningWindow(self.default_controller, self.dut_node_id)

self.print_step(
2, "TH0 reads the BasicCommissioningInfo field from the General commissioning cluster saves MaxCumulativeFailsafeSeconds as `failsafe_max`")
Expand All @@ -94,8 +94,8 @@ async def test_TC_OPCREDS_3_1(self):

self.print_step(3, "TH1 opens a PASE connection to the DUT")
newNodeId = self.dut_node_id + 1
self.FindAndEstablishPase(dev_ctrl=TH1, longDiscriminator=longDiscriminator,
setupPinCode=params.setupPinCode, nodeid=newNodeId)
await self.FindAndEstablishPase(dev_ctrl=TH1, longDiscriminator=longDiscriminator,
setupPinCode=params.setupPinCode, nodeid=newNodeId)

self.print_step(4, "TH1 sends ArmFailSafe command to the DUT with the ExpiryLengthSeconds field set to failsafe_max")
resp = await self.send_single_cmd(dev_ctrl=TH1, node_id=newNodeId, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(failsafe_max))
Expand All @@ -109,7 +109,7 @@ async def test_TC_OPCREDS_3_1(self):
self.print_step(6, "TH1 obtains or generates the NOC, the Root CA Certificate and ICAC using csrResponse and selects an IPK. The certificates shall have their subjects padded with additional data such that they are each the maximum certificate size of 400 bytes when encoded in the MatterCertificateEncoding.")
# Our CA is set up to maximize cert chains already
# Extract the RCAC public key and save as `Root_Public_Key_TH1`
TH1_certs_real = TH1.IssueNOCChain(csrResponse, newNodeId)
TH1_certs_real = await TH1.IssueNOCChain(csrResponse, newNodeId)
if (TH1_certs_real.rcacBytes is None or
TH1_certs_real.icacBytes is None or
TH1_certs_real.nocBytes is None or TH1_certs_real.ipkBytes is None):
Expand All @@ -125,7 +125,7 @@ async def test_TC_OPCREDS_3_1(self):
TH1_CA_fake = self.certificate_authority_manager.NewCertificateAuthority()
TH1_fabric_admin_fake = TH1_CA_fake.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
TH1_fake = TH1_fabric_admin_fake.NewController(nodeId=self.default_controller.nodeId)
TH1_certs_fake = TH1_fake.IssueNOCChain(csrResponse, newNodeId)
TH1_certs_fake = await TH1_fake.IssueNOCChain(csrResponse, newNodeId)
if (TH1_certs_fake.rcacBytes is None or
TH1_certs_fake.icacBytes is None or
TH1_certs_fake.nocBytes is None or TH1_certs_real.ipkBytes is None):
Expand Down Expand Up @@ -361,7 +361,7 @@ async def test_TC_OPCREDS_3_1(self):
asserts.assert_equal(len(fabrics), fabrics_original_size, "Fabric list size does not match original")

self.print_step(37, "TH1 fully commissions DUT onto the fabric using a set of valid certificates")
TH1.Commission(newNodeId)
await TH1.Commission(newNodeId)

self.print_step(
38, "TH1 reads the TrustedRootCertificates list from DUT and verify that there are trusted_root_original_size + 1 entries")
Expand Down Expand Up @@ -404,7 +404,7 @@ async def test_TC_OPCREDS_3_1(self):
resp.statusCode, opcreds.Enums.NodeOperationalCertStatusEnum.kOk, "Failure on UpdateFabricLabel")

self.print_step(44, "TH1 sends an OpenCommissioningWindow command to the Administrator Commissioning cluster")
longDiscriminator, params = self.OpenCommissioningWindow(TH1, newNodeId)
longDiscriminator, params = await self.OpenCommissioningWindow(TH1, newNodeId)

self.print_step(45, "TH2 commissions the DUT")
TH2_CA = self.certificate_authority_manager.NewCertificateAuthority(maximizeCertChains=True)
Expand All @@ -413,7 +413,7 @@ async def test_TC_OPCREDS_3_1(self):
TH2_nodeid = self.default_controller.nodeId+2
TH2 = TH2_fabric_admin.NewController(nodeId=TH2_nodeid)
TH2_dut_nodeid = self.dut_node_id+2
TH2.CommissionOnNetwork(
await TH2.CommissionOnNetwork(
nodeId=TH2_dut_nodeid, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=longDiscriminator)

Expand Down Expand Up @@ -484,7 +484,7 @@ async def test_TC_OPCREDS_3_1(self):
temp_CA = self.certificate_authority_manager.NewCertificateAuthority()
temp_fabric_admin = temp_CA.NewFabricAdmin(vendorId=0xFFF1, fabricId=3)
temp_controller = temp_fabric_admin.NewController(nodeId=self.default_controller.nodeId)
temp_certs = temp_controller.IssueNOCChain(csrResponse, newNodeId)
temp_certs = await temp_controller.IssueNOCChain(csrResponse, newNodeId)
if (temp_certs.rcacBytes is None or
temp_certs.icacBytes is None or
temp_certs.nocBytes is None or temp_certs.ipkBytes is None):
Expand Down Expand Up @@ -521,7 +521,7 @@ async def test_TC_OPCREDS_3_1(self):

self.print_step(61, "TH1 obtains or generates a NOC and ICAC using the CSR elements from the previous step with a different NodeID, but the same Root CA Certificate and fabric ID as step <<TH1-gen-real-creds>>. Save as `Node_Operational_Certificates_TH1_fabric_conflict` and `Intermediate_Certificate_TH1_fabric_conflict`|")
anotherNodeId = newNodeId + 1
TH1_certs_fabric_conflict = TH1.IssueNOCChain(csrResponse_new, anotherNodeId)
TH1_certs_fabric_conflict = await TH1.IssueNOCChain(csrResponse_new, anotherNodeId)
if (TH1_certs_fabric_conflict.rcacBytes is None or
TH1_certs_fabric_conflict.icacBytes is None or
TH1_certs_fabric_conflict.nocBytes is None or TH1_certs_fabric_conflict.ipkBytes is None):
Expand Down Expand Up @@ -565,7 +565,7 @@ async def test_TC_OPCREDS_3_1(self):
"Unexpected response type for UpdateNOC csr request")

self.print_step(68, "TH1 obtains or generates a NOC, Root CA Certificate, ICAC using the CSR elements from the previous step")
TH1_certs_3 = TH1.IssueNOCChain(csrResponse, anotherNodeId)
TH1_certs_3 = await TH1.IssueNOCChain(csrResponse, anotherNodeId)
if (TH1_certs_3.rcacBytes is None or
TH1_certs_3.icacBytes is None or
TH1_certs_3.nocBytes is None or TH1_certs_3.ipkBytes is None):
Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/TC_TIMESYNC_2_13.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ async def test_TC_TIMESYNC_2_13(self):
self.print_step(0, "Commissioning, already done")

self.print_step(1, "TH1 opens a commissioning window")
params = self.default_controller.OpenCommissioningWindow(
params = await self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1)

self.print_step(2, "Commission to TH2")
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
TH2 = new_fabric_admin.NewController(nodeId=112233)

TH2.CommissionOnNetwork(
await TH2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)

Expand Down
4 changes: 2 additions & 2 deletions src/python_testing/TestCommissioningTimeSync.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ async def teardown_test(self):
return super().teardown_test()

async def commission_and_base_checks(self):
params = self.default_controller.OpenCommissioningWindow(
params = await self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1)
self.commissioner.CommissionOnNetwork(
await self.commissioner.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)
self.commissioned = True
Expand Down
6 changes: 3 additions & 3 deletions src/python_testing/basic_composition_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ def ConvertValue(value) -> Any:


class BasicCompositionTests:
def connect_over_pase(self, dev_ctrl):
async def connect_over_pase(self, dev_ctrl):
setupCode = self.matter_test_config.qr_code_content if self.matter_test_config.qr_code_content is not None else self.matter_test_config.manual_code
asserts.assert_true(setupCode, "Require either --qr-code or --manual-code.")
dev_ctrl.FindOrEstablishPASESession(setupCode, self.dut_node_id)
await dev_ctrl.FindOrEstablishPASESession(setupCode, self.dut_node_id)

def dump_wildcard(self, dump_device_composition_path: typing.Optional[str]):
node_dump_dict = {endpoint_id: MatterTlvToJson(self.endpoints_tlv[endpoint_id]) for endpoint_id in self.endpoints_tlv}
Expand All @@ -121,7 +121,7 @@ async def setup_class_helper(self, default_to_pase: bool = True):
dump_device_composition_path: Optional[str] = self.user_params.get("dump_device_composition_path", None)

if do_test_over_pase:
self.connect_over_pase(dev_ctrl)
await self.connect_over_pase(dev_ctrl)
node_id = self.dut_node_id
else:
# Using the already commissioned node
Expand Down
Loading

0 comments on commit 6958c82

Please sign in to comment.