Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial test script for Fabric Sync TC_MCORE_FS_1_2 #34675

Merged
merged 7 commits into from
Jul 31, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions src/python_testing/TC_MCORE_FS_1_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#

import ipaddress
import logging
import os
import pathlib
import queue
import random
import signal
import subprocess
import time
import uuid

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, has_cluster, per_endpoint_test
from mobly import asserts
from TC_SC_3_6 import AttributeChangeAccumulator


class TC_MCORE_FS_1_2(MatterBaseTest):

@async_test_body
async def setup_class(self):
super().setup_class()

# Create a second controller on a new fabric to act as the TH_FSA and commission devices through the DUT_FSA
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
paa_path = str(self.matter_test_config.paa_trust_store_path)
self.TH_FSA = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path)

def teardown_class(self):
super().teardown_class()

def steps_TC_MCORE_FS_1_2(self) -> list[TestStep]:
steps = [TestStep(1, "TH_FSA subscribes to all the Bridged Device Basic Information clusters provided by DUT_FSA to identify the presence of a Bridged Node endpoint with a UniqueID matching the UniqueID provided by the BasicInformationCluster of the TH_SED_DUT."),
TestStep(2, "TH_FSA initiates commissioning of TH_SED_DUT by sending the OpenCommissioningWindow command to the Administrator Commissioning Cluster on the endpoint with the uniqueID matching that of TH_SED_DUT."),
TestStep(3, "TH_FSA completes commissioning of TH_SED_DUT using the Enhanced Commissioning Method."),
TestStep(4, "Commission TH_SED_L onto DUT_FSA’s fabric using the manufacturer specified mechanism."),
TestStep(5, "TH_FSA waits for subscription report from a the Bridged Device Basic Information clusters provided by DUT_FSA to identify the presence of a Bridged Node endpoint with a UniqueID matching the UniqueID provided by the BasicInformationCluster of the TH_SED_L."),
TestStep(6, "TH_FSA initiates commissions of TH_SED_L by sending the OpenCommissioningWindow command to the Administrator Commissioning Cluster on the endpoint with the uniqueID matching that of TH_SED_L."),
TestStep(7, "TH_FSA completes commissioning of TH_SED_L using the Enhanced Commissioning Method.")]
return steps

@async_test_body
async def test_TC_MCORE_FS_1_2(self):
self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
min_report_interval_sec = self.user_params.get("min_report_interval_sec", 0)
max_report_interval_sec = self.user_params.get("max_report_interval_sec", 10 * 60)
timeout_delay_sec = self.user_params.get("timeout_delay_sec", max_report_interval_sec * 2)

self.step(1)

# Subscribe to the UniqueIDs
unique_id_queue = queue.Queue()
subscription_contents = [
(Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID) # On all endpoints
]
sub = await self.default_controller.ReadAttribute(
nodeid=self.dut_node_id,
attributes=subscription_contents,
reportInterval=(min_report_interval_sec, max_report_interval_sec),
keepSubscriptions=False
)
attribute_handler = AttributeChangeAccumulator(
name=self.default_controller.name, expected_attribute=Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID, output=unique_id_queue)
sub.SetAttributeUpdateCallback(attribute_handler)
sub_handlers.append(attribute_handler)

logging.info("Waiting for First BridgedDeviceBasicInformation.")
start_time = time.time()
elapsed = 0
time_remaining = timeout_delay_sec

th_sed_dut_bdbi_endpoint = -1
th_sed_dut_unique_id = -1

while time_remaining > 0 and th_sed_dut_bdbi_endpoint < 0:
try:
item = unique_id_queue.get(block=True, timeout=time_remaining)
client_name, endpoint, attribute, value = item['name'], item['endpoint'], item['attribute'], item['value']

# Record arrival of an expected subscription change when seen
if attribute == Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID:
th_sed_dut_bdbi_endpoint = endpoint
th_sed_dut_unique_id = value

except queue.Empty:
# No error, we update timeouts and keep going
pass

elapsed = time.time() - start_time
time_remaining = timeout_delay_sec - elapsed

logging.info("Found BDBI with UniqueID (%d) on endpoint %d." % th_sed_dut_unique_id, th_sed_dut_bdbi_endpoint)

self.step(2)

self.sync_passcode = 20202024
self.th_sed_dut_discriminator = 2222
# min commissioning timeout is 3*60 seconds, so use that even though the command said 30.
cmd = Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(commissioningTimeout=3*60,
PAKEPasscodeVerifier=b"+w1qZQR05Zn0bc2LDyNaDAhsrhDS5iRHPTN10+EmNx8E2OpIPC4SjWRDQVOgqcbnXdYMlpiZ168xLBqn1fx9659gGK/7f9Yc6GxpoJH8kwAUYAYyLGsYeEBt1kL6kpXjgA==",
discriminator=self.th_sed_dut_discriminator,
iterations=10000, salt=base64.b64encode(bytes('SaltyMcSalterson', 'utf-8')))
await self.send_single_cmd(cmd, endpoint=th_sed_dut_bdbi_endpoint, timedRequestTimeoutMs=5000)

logging.info("Commissioning Window open for TH_SED_DUT.")

self.step(3)

self.th_sed_dut_nodeid = 1111
await self.TH_server_controller.CommissionOnNetwork(nodeId=self.th_sed_dut_nodeid, setupPinCode=self.sync_passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.th_sed_dut_discriminator)
logging.info("Commissioning TH_SED_DUT complete")

self.step(4)
if not self.is_ci:
self.wait_for_use_input("Commission TH_SED_DUT onto DUT_FSA’s fabric using the manufacturer specified mechanism. (ensure Synchronization is enabled.)")

self.step(5)

th_sed_later_bdbi_endpoint = -1
th_sed_later_unique_id = -1
logging.info("Waiting for Second BridgedDeviceBasicInformation.")

while time_remaining > 0 and th_sed_later_bdbi_endpoint < 0:
try:
item = unique_id_queue.get(block=True, timeout=time_remaining)
client_name, endpoint, attribute, value = item['name'], item['endpoint'], item['attribute'], item['value']

# Record arrival of an expected subscription change when seen
if attribute == Clusters.BridgedDeviceBasicInformation.Attributes.UniqueID and endpoint != th_sed_dut_bdbi_endpoint and th_sed_later_unique_id != th_sed_dut_unique_id:
th_sed_later_bdbi_endpoint = endpoint
th_sed_later_unique_id = value

except queue.Empty:
# No error, we update timeouts and keep going
pass

elapsed = time.time() - start_time
time_remaining = timeout_delay_sec - elapsed

logging.info("Found another BDBI with UniqueID (%d) on endpoint %d." % th_sed_later_unique_id, th_sed_later_bdbi_endpoint)

self.step(6)

self.th_sed_later_discriminator = 3333
# min commissioning timeout is 3*60 seconds, so use that even though the command said 30.
cmd = Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(commissioningTimeout=3*60,
PAKEPasscodeVerifier=b"+w1qZQR05Zn0bc2LDyNaDAhsrhDS5iRHPTN10+EmNx8E2OpIPC4SjWRDQVOgqcbnXdYMlpiZ168xLBqn1fx9659gGK/7f9Yc6GxpoJH8kwAUYAYyLGsYeEBt1kL6kpXjgA==",
discriminator=self.th_sed_later_discriminator,
iterations=10000, salt=base64.b64encode(bytes('SaltyMcSalterson', 'utf-8')))
await self.send_single_cmd(cmd, endpoint=th_sed_later_bdbi_endpoint, timedRequestTimeoutMs=5000)

logging.info("Commissioning Window open for TH_SED_L.")

self.step(7)

self.th_sed_later_nodeid = 2222
await self.TH_server_controller.CommissionOnNetwork(nodeId=self.th_sed_later_nodeid, setupPinCode=self.sync_passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.th_sed_later_discriminator)
logging.info("Commissioning TH_SED_L complete")

if __name__ == "__main__":
default_matter_test_main()
Loading