diff --git a/src/python_testing/TC_BRBINFO_4_1.py b/src/python_testing/TC_BRBINFO_4_1.py index 32dd541d66f676..f0c194ac2bd7f1 100644 --- a/src/python_testing/TC_BRBINFO_4_1.py +++ b/src/python_testing/TC_BRBINFO_4_1.py @@ -15,22 +15,44 @@ # limitations under the License. # +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: +# run1: +# app: examples/fabric-admin/scripts/fabric-sync-app.py +# app-args: --app-admin=${FABRIC_ADMIN_APP} --app-bridge=${FABRIC_BRIDGE_APP} --stdin-pipe=dut-fsa-stdin --discriminator=1234 +# app-ready-pattern: "Successfully opened pairing window on the device" +# script-args: > +# --PICS src/app/tests/suites/certification/ci-pics-values +# --storage-path admin_storage.json +# --commissioning-method on-network +# --discriminator 1234 +# --passcode 20202021 +# --string-arg th_icd_server_app_path:${LIT_ICD_APP} dut_fsa_stdin_pipe:dut-fsa-stdin +# --trace-to json:${TRACE_TEST_JSON}.json +# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto +# factoryreset: true +# quiet: true +# === END CI TEST ARGUMENTS === + # This test requires a TH_ICD_SERVER application. Please specify with --string-arg th_icd_server_app_path: # TH_ICD_SERVER must support following arguments: --secured-device-port --discriminator --passcode --KVS # E.g: python3 src/python_testing/TC_BRBINFO_4_1.py --commissioning-method on-network --qr-code MT:-24J042C00KA0648G00 \ # --string-arg th_icd_server_app_path:out/linux-x64-lit-icd/lit-icd-app +import asyncio import logging import os import queue -import signal -import subprocess -import time -import uuid +import random +import tempfile import chip.clusters as Clusters from chip import ChipDeviceCtrl from chip.interaction_model import InteractionModelError, Status +from chip.testing.apps import IcdAppServerSubprocess from matter_testing_support import MatterBaseTest, SimpleEventCallback, TestStep, async_test_body, default_matter_test_main from mobly import asserts @@ -40,13 +62,6 @@ class TC_BRBINFO_4_1(MatterBaseTest): - # - # Class Helper functions - # - - async def _read_attribute_expect_success(self, endpoint, cluster, attribute, node_id): - return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute, node_id=node_id) - # This test has some manual steps and also multiple sleeps >= 30 seconds. Test typically runs under 3 mins, # so 6 minutes is more than enough. @property @@ -58,7 +73,7 @@ def desc_TC_BRBINFO_4_1(self) -> str: return "[TC_BRBINFO_4_1] Verification of KeepActive Command [DUT-Server]" def steps_TC_BRBINFO_4_1(self) -> list[TestStep]: - steps = [ + return [ TestStep("0", "DUT commissioned and preconditions", is_commissioning=True), TestStep("1", "TH reads from the ICD the A_IDLE_MODE_DURATION, A_ACTIVE_MODE_DURATION, and ACTIVE_MODE_THRESHOLD attributes"), TestStep("2", "Setting up subscribe to ActiveChange event"), @@ -77,16 +92,16 @@ def steps_TC_BRBINFO_4_1(self) -> list[TestStep]: TestStep("15", "Send KeepActive command with shortest TimeoutMs value while TH_ICD is prevented from sending check-ins"), TestStep("16", "Wait 15 seconds then send second KeepActive command with double the TimeoutMs value of the previous step"), TestStep("17", "TH allows TH_ICD to resume sending check-ins after timeout from step 15 expired but before second timeout from step 16 still valid"), - TestStep("18", "Wait for TH_ICD to check into TH, then confirm we have received new event from DUT")] - return steps + TestStep("18", "Wait for TH_ICD to check into TH, then confirm we have received new event from DUT"), + ] - def _ask_for_vendor_commissioniong_ux_operation(self, discriminator, setupPinCode, setupManualCode, setupQRCode): + def _ask_for_vendor_commissioning_ux_operation(self, discriminator, setupPinCode, setupManualCode, setupQRCode): self.wait_for_user_input( prompt_msg=f"Using the DUT vendor's provided interface, commission the ICD device using the following parameters:\n" f"- discriminator: {discriminator}\n" f"- setupPinCode: {setupPinCode}\n" f"- setupQRCode: {setupQRCode}\n" - f"- setupManualcode: {setupManualCode}\n" + f"- setupManualCode: {setupManualCode}\n" f"If using FabricSync Admin test app, you may type:\n" f">>> pairing onnetwork 111 {setupPinCode} --icd-registration true") @@ -117,81 +132,88 @@ async def _get_dynamic_endpoint(self) -> int: @async_test_body async def setup_class(self): + super().setup_class() + # These steps are not explicitly, but they help identify the dynamically added endpoint # The second part of this process happens on _get_dynamic_endpoint() - root_part_list = await self.read_single_attribute_check_success(cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.PartsList, endpoint=_ROOT_ENDPOINT_ID) + root_part_list = await self.read_single_attribute_check_success( + cluster=Clusters.Descriptor, + attribute=Clusters.Descriptor.Attributes.PartsList, + endpoint=_ROOT_ENDPOINT_ID) self.set_of_dut_endpoints_before_adding_device = set(root_part_list) - super().setup_class() self._active_change_event_subscription = None - self.app_process = None - self.app_process_paused = False - app = self.user_params.get("th_icd_server_app_path", None) - if not app: - asserts.fail('This test requires a TH_ICD_SERVER app. Specify app path with --string-arg th_icd_server_app_path:') - - self.kvs = f'kvs_{str(uuid.uuid4())}' - discriminator = 3850 - passcode = 20202021 - cmd = [app] - cmd.extend(['--secured-device-port', str(5543)]) - cmd.extend(['--discriminator', str(discriminator)]) - cmd.extend(['--passcode', str(passcode)]) - cmd.extend(['--KVS', self.kvs]) + self.th_icd_server = None + self.storage = None - logging.info("Starting ICD Server App") - self.app_process = subprocess.Popen(cmd) - logging.info("ICD started") - time.sleep(3) + th_icd_server_app = self.user_params.get("th_icd_server_app_path", None) + if not th_icd_server_app: + asserts.fail('This test requires a TH_ICD_SERVER app. Specify app path with --string-arg th_icd_server_app_path:') + if not os.path.exists(th_icd_server_app): + asserts.fail(f'The path {th_icd_server_app} does not exist') + + # Create a temporary storage directory for keeping KVS files. + self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__) + logging.info("Temporary storage directory: %s", self.storage.name) + + if self.is_pics_sdk_ci_only: + # Get the named pipe path for the DUT_FSA app input from the user params. + dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe") + if not dut_fsa_stdin_pipe: + asserts.fail("CI setup requires --string-arg dut_fsa_stdin_pipe:") + self.dut_fsa_stdin = open(dut_fsa_stdin_pipe, "w") + + self.th_icd_server_port = 5543 + self.th_icd_server_discriminator = random.randint(0, 4095) + self.th_icd_server_passcode = 20202021 + + # Start the TH_ICD_SERVER app. + self.th_icd_server = IcdAppServerSubprocess( + th_icd_server_app, + storage_dir=self.storage.name, + port=self.th_icd_server_port, + discriminator=self.th_icd_server_discriminator, + passcode=self.th_icd_server_passcode) + self.th_icd_server.start( + expected_output="Server initialization complete", + timeout=30) logging.info("Commissioning of ICD to fabric one (TH)") self.icd_nodeid = 1111 self.default_controller.EnableICDRegistration(self.default_controller.GenerateICDRegistrationParameters()) - await self.default_controller.CommissionOnNetwork(nodeId=self.icd_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator) + await self.default_controller.CommissionOnNetwork( + nodeId=self.icd_nodeid, + setupPinCode=self.th_icd_server_passcode, + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, + filter=self.th_icd_server_discriminator) logging.info("Commissioning of ICD to fabric two (DUT)") params = await self.openCommissioningWindow(dev_ctrl=self.default_controller, node_id=self.icd_nodeid) - self._ask_for_vendor_commissioniong_ux_operation(params.randomDiscriminator, params.commissioningParameters.setupPinCode, - params.commissioningParameters.setupManualCode, params.commissioningParameters.setupQRCode) + if not self.is_pics_sdk_ci_only: + self._ask_for_vendor_commissioning_ux_operation( + params.randomDiscriminator, + params.commissioningParameters.setupPinCode, + params.commissioningParameters.setupManualCode, + params.commissioningParameters.setupQRCode) + else: + self.dut_fsa_stdin.write( + f"pairing onnetwork 2 {params.commissioningParameters.setupPinCode} --icd-registration true\n") + self.dut_fsa_stdin.flush() + # Wait for the commissioning to complete. + await asyncio.sleep(5) def teardown_class(self): if self._active_change_event_subscription is not None: self._active_change_event_subscription.Shutdown() self._active_change_event_subscription = None - - # In case the th_icd_server_app_path does not exist, then we failed the test - # and there is nothing to remove - if self.app_process is not None: - self.resume_th_icd_server(check_state=False) - logging.warning("Stopping app with SIGTERM") - self.app_process.send_signal(signal.SIGTERM.value) - self.app_process.wait() - - if os.path.exists(self.kvs): - os.remove(self.kvs) - + if self.th_icd_server is not None: + self.th_icd_server.terminate() + if self.storage is not None: + self.storage.cleanup() super().teardown_class() - def pause_th_icd_server(self, check_state): - if check_state: - asserts.assert_false(self.app_process_paused, "ICD TH Server unexpectedly is already paused") - if self.app_process_paused: - return - # stops (halts) the ICD server process by sending a SIGTOP signal - self.app_process.send_signal(signal.SIGSTOP.value) - self.app_process_paused = True - - def resume_th_icd_server(self, check_state): - if check_state: - asserts.assert_true(self.app_process_paused, "ICD TH Server unexpectedly is already running") - if not self.app_process_paused: - return - # resumes (continues) the ICD server process by sending a SIGCONT signal - self.app_process.send_signal(signal.SIGCONT.value) - self.app_process_paused = False - # # BRBINFO 4.1 Test Body # @@ -210,34 +232,42 @@ async def test_TC_BRBINFO_4_1(self): self.step("0") logging.info("Ensuring DUT is commissioned to TH") - # Confirms commissioning of DUT on TH as it reads its fature map - await self._read_attribute_expect_success( - _ROOT_ENDPOINT_ID, - basic_info_cluster, - basic_info_attributes.FeatureMap, - self.dut_node_id + # Confirms commissioning of DUT on TH as it reads its feature map + await self.read_single_attribute_check_success( + endpoint=_ROOT_ENDPOINT_ID, + cluster=basic_info_cluster, + attribute=basic_info_attributes.FeatureMap, + node_id=self.dut_node_id, ) logging.info("Ensuring ICD is commissioned to TH") self.step("1") - idle_mode_duration_s = await self._read_attribute_expect_success( - _ROOT_ENDPOINT_ID, - icdm_cluster, - icdm_attributes.IdleModeDuration, - self.icd_nodeid + idle_mode_duration_s = await self.read_single_attribute_check_success( + endpoint=_ROOT_ENDPOINT_ID, + cluster=icdm_cluster, + attribute=icdm_attributes.IdleModeDuration, + node_id=self.icd_nodeid, ) logging.info(f"IdleModeDurationS: {idle_mode_duration_s}") - active_mode_duration_ms = await self._read_attribute_expect_success( - _ROOT_ENDPOINT_ID, - icdm_cluster, - icdm_attributes.ActiveModeDuration, - self.icd_nodeid + active_mode_duration_ms = await self.read_single_attribute_check_success( + endpoint=_ROOT_ENDPOINT_ID, + cluster=icdm_cluster, + attribute=icdm_attributes.ActiveModeDuration, + node_id=self.icd_nodeid, ) logging.info(f"ActiveModeDurationMs: {active_mode_duration_ms}") + active_mode_threshold_ms = await self.read_single_attribute_check_success( + endpoint=_ROOT_ENDPOINT_ID, + cluster=icdm_cluster, + attribute=icdm_attributes.ActiveModeThreshold, + node_id=self.icd_nodeid, + ) + logging.info(f"ActiveModeThresholdMs: {active_mode_threshold_ms}") + self.step("2") event = brb_info_cluster.Events.ActiveChanged self.q = queue.Queue() @@ -292,7 +322,7 @@ async def test_TC_BRBINFO_4_1(self): asserts.assert_equal(self.q.qsize(), 0, "Unexpected event received from DUT") self.step("9") - self.pause_th_icd_server(check_state=True) + self.th_icd_server.pause() # sends 3x keep active commands stay_active_duration_ms = 2000 keep_active_timeout_ms = 60000 @@ -304,7 +334,7 @@ async def test_TC_BRBINFO_4_1(self): await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id) self.step("10") - self.resume_th_icd_server(check_state=True) + self.th_icd_server.resume() await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000) promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s=wait_for_dut_event_subscription_s) asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT") @@ -314,14 +344,14 @@ async def test_TC_BRBINFO_4_1(self): asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT") self.step("12") - self.pause_th_icd_server(check_state=True) + self.th_icd_server.pause() stay_active_duration_ms = 2000 keep_active_timeout_ms = 30000 await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id) self.step("13") - time.sleep(30) - self.resume_th_icd_server(check_state=True) + await asyncio.sleep(30) + self.th_icd_server.resume() self.step("14") await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000) @@ -329,20 +359,20 @@ async def test_TC_BRBINFO_4_1(self): asserts.assert_equal(self.q.qsize(), 0, "Unexpected event received from DUT") self.step("15") - self.pause_th_icd_server(check_state=True) + self.th_icd_server.pause() stay_active_duration_ms = 2000 keep_active_timeout_ms = 30000 await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id) self.step("16") - time.sleep(15) + await asyncio.sleep(15) stay_active_duration_ms = 2000 keep_active_timeout_ms = 60000 await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id) self.step("17") - time.sleep(15) - self.resume_th_icd_server(check_state=True) + await asyncio.sleep(15) + self.th_icd_server.resume() self.step("18") await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000) diff --git a/src/python_testing/TC_CCTRL_2_2.py b/src/python_testing/TC_CCTRL_2_2.py index 4b6f80017096ff..ea2918fcc0b346 100644 --- a/src/python_testing/TC_CCTRL_2_2.py +++ b/src/python_testing/TC_CCTRL_2_2.py @@ -49,10 +49,10 @@ import chip.clusters as Clusters from chip import ChipDeviceCtrl from chip.interaction_model import InteractionModelError, Status +from chip.testing.apps import AppServerSubprocess from matter_testing_support import (MatterBaseTest, TestStep, async_test_body, default_matter_test_main, has_cluster, run_if_endpoint_matches) from mobly import asserts -from TC_MCORE_FS_1_1 import AppServer class TC_CCTRL_2_2(MatterBaseTest): @@ -79,13 +79,15 @@ async def setup_class(self): self.th_server_passcode = 20202021 # Start the TH_SERVER app. - self.th_server = AppServer( + self.th_server = AppServerSubprocess( th_server_app, storage_dir=self.storage.name, port=self.th_server_port, discriminator=self.th_server_discriminator, passcode=self.th_server_passcode) - self.th_server.start() + self.th_server.start( + expected_output="Server initialization complete", + timeout=30) logging.info("Commissioning from separate fabric") diff --git a/src/python_testing/TC_CCTRL_2_3.py b/src/python_testing/TC_CCTRL_2_3.py index 15f7304dab27cb..c5ccaa837737b2 100644 --- a/src/python_testing/TC_CCTRL_2_3.py +++ b/src/python_testing/TC_CCTRL_2_3.py @@ -49,10 +49,10 @@ import chip.clusters as Clusters from chip import ChipDeviceCtrl from chip.interaction_model import InteractionModelError, Status +from chip.testing.apps import AppServerSubprocess from matter_testing_support import (MatterBaseTest, TestStep, async_test_body, default_matter_test_main, has_cluster, run_if_endpoint_matches) from mobly import asserts -from TC_MCORE_FS_1_1 import AppServer class TC_CCTRL_2_3(MatterBaseTest): @@ -79,13 +79,15 @@ async def setup_class(self): self.th_server_passcode = 20202021 # Start the TH_SERVER app. - self.th_server = AppServer( + self.th_server = AppServerSubprocess( th_server_app, storage_dir=self.storage.name, port=self.th_server_port, discriminator=self.th_server_discriminator, passcode=self.th_server_passcode) - self.th_server.start() + self.th_server.start( + expected_output="Server initialization complete", + timeout=30) logging.info("Commissioning from separate fabric") diff --git a/src/python_testing/TC_ECOINFO_2_1.py b/src/python_testing/TC_ECOINFO_2_1.py index f3f22bb99c7f37..cd966e4c145398 100644 --- a/src/python_testing/TC_ECOINFO_2_1.py +++ b/src/python_testing/TC_ECOINFO_2_1.py @@ -46,10 +46,10 @@ import chip.clusters as Clusters from chip.clusters.Types import NullValue from chip.interaction_model import Status +from chip.testing.apps import AppServerSubprocess from chip.tlv import uint from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts -from TC_MCORE_FS_1_1 import AppServer class TC_ECOINFO_2_1(MatterBaseTest): @@ -95,13 +95,15 @@ async def _setup_ci_prerequisites(self): self.th_server_passcode = 20202021 # Start the server app. - self.th_server = AppServer( + self.th_server = AppServerSubprocess( th_server_app, storage_dir=self.storage.name, port=self.th_server_port, discriminator=self.th_server_discriminator, passcode=self.th_server_passcode) - self.th_server.start() + self.th_server.start( + expected_output="Server initialization complete", + timeout=30) # Add some server to the DUT_FSA's Aggregator/Bridge. self.dut_fsa_stdin.write(f"pairing onnetwork 2 {self.th_server_passcode}\n") diff --git a/src/python_testing/TC_ECOINFO_2_2.py b/src/python_testing/TC_ECOINFO_2_2.py index 96fa2cd4d00eb3..41d7fc07709d86 100644 --- a/src/python_testing/TC_ECOINFO_2_2.py +++ b/src/python_testing/TC_ECOINFO_2_2.py @@ -45,9 +45,9 @@ import chip.clusters as Clusters from chip.interaction_model import Status +from chip.testing.apps import AppServerSubprocess from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main from mobly import asserts -from TC_MCORE_FS_1_1 import AppServer _DEVICE_TYPE_AGGREGGATOR = 0x000E @@ -94,13 +94,15 @@ def _setup_ci_prerequisites(self): self.th_server_passcode = 20202021 # Start the server app. - self.th_server = AppServer( + self.th_server = AppServerSubprocess( th_server_app, storage_dir=self.storage.name, port=self.th_server_port, discriminator=self.th_server_discriminator, passcode=self.th_server_passcode) - self.th_server.start() + self.th_server.start( + expected_output="Server initialization complete", + timeout=30) def steps_TC_ECOINFO_2_2(self) -> list[TestStep]: return [ diff --git a/src/python_testing/TC_MCORE_FS_1_1.py b/src/python_testing/TC_MCORE_FS_1_1.py index 8e43d611065f0a..c30c1ec1b246b3 100755 --- a/src/python_testing/TC_MCORE_FS_1_1.py +++ b/src/python_testing/TC_MCORE_FS_1_1.py @@ -47,31 +47,11 @@ import chip.clusters as Clusters from chip import ChipDeviceCtrl -from chip.testing.tasks import Subprocess +from chip.testing.apps import AppServerSubprocess from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main from mobly import asserts -class AppServer(Subprocess): - """Wrapper class for starting an application server in a subprocess.""" - - # Prefix for log messages from the application server. - PREFIX = b"[SERVER]" - - def __init__(self, app: str, storage_dir: str, discriminator: int, passcode: int, port: int = 5540): - storage_kvs_dir = tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-")[1] - # Start the server application with dedicated KVS storage. - super().__init__(app, "--KVS", storage_kvs_dir, - '--secured-device-port', str(port), - "--discriminator", str(discriminator), - "--passcode", str(passcode), - output_cb=lambda line, is_stderr: self.PREFIX + line) - - def start(self): - # Start process and block until it prints the expected output. - super().start(expected_output="Server initialization complete") - - class TC_MCORE_FS_1_1(MatterBaseTest): @async_test_body @@ -96,13 +76,15 @@ async def setup_class(self): self.th_server_passcode = 20202021 # Start the TH_SERVER app. - self.th_server = AppServer( + self.th_server = AppServerSubprocess( th_server_app, storage_dir=self.storage.name, port=self.th_server_port, discriminator=self.th_server_discriminator, passcode=self.th_server_passcode) - self.th_server.start() + self.th_server.start( + expected_output="Server initialization complete", + timeout=30) logging.info("Commissioning from separate fabric") # Create a second controller on a new fabric to communicate to the server diff --git a/src/python_testing/TC_MCORE_FS_1_2.py b/src/python_testing/TC_MCORE_FS_1_2.py index 6cd1c85bac206e..f7e88870ddc3a6 100644 --- a/src/python_testing/TC_MCORE_FS_1_2.py +++ b/src/python_testing/TC_MCORE_FS_1_2.py @@ -50,10 +50,10 @@ import chip.clusters as Clusters from chip import ChipDeviceCtrl +from chip.testing.apps import AppServerSubprocess from ecdsa.curves import NIST256p from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts -from TC_MCORE_FS_1_1 import AppServer from TC_SC_3_6 import AttributeChangeAccumulator # Length of `w0s` and `w1s` elements @@ -97,9 +97,11 @@ async def setup_class(self): self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__) logging.info("Temporary storage directory: %s", self.storage.name) - # Get the named pipe path for the DUT_FSA app input from the user params. - dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe", None) - if dut_fsa_stdin_pipe is not None: + if self.is_pics_sdk_ci_only: + # Get the named pipe path for the DUT_FSA app input from the user params. + dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe") + if not dut_fsa_stdin_pipe: + asserts.fail("CI setup requires --string-arg dut_fsa_stdin_pipe:") self.dut_fsa_stdin = open(dut_fsa_stdin_pipe, "w") self.th_server_port = th_server_port @@ -111,13 +113,15 @@ async def setup_class(self): passcode=20202021) # Start the TH_SERVER app. - self.th_server = AppServer( + self.th_server = AppServerSubprocess( th_server_app, storage_dir=self.storage.name, port=self.th_server_port, discriminator=self.th_server_setup_params.discriminator, passcode=self.th_server_setup_params.passcode) - self.th_server.start() + self.th_server.start( + expected_output="Server initialization complete", + timeout=30) def teardown_class(self): if self._partslist_subscription is not None: @@ -135,7 +139,7 @@ def _ask_for_vendor_commissioning_ux_operation(self, setup_params: _SetupParamet f"- discriminator: {setup_params.discriminator}\n" f"- setupPinCode: {setup_params.passcode}\n" f"- setupQRCode: {setup_params.setup_qr_code}\n" - f"- setupManualcode: {setup_params.manual_code}\n" + f"- setupManualCode: {setup_params.manual_code}\n" f"If using FabricSync Admin test app, you may type:\n" f">>> pairing onnetwork 111 {setup_params.passcode}") diff --git a/src/python_testing/TC_MCORE_FS_1_3.py b/src/python_testing/TC_MCORE_FS_1_3.py index 49dc89386c644e..7dcca366a408ab 100644 --- a/src/python_testing/TC_MCORE_FS_1_3.py +++ b/src/python_testing/TC_MCORE_FS_1_3.py @@ -50,9 +50,9 @@ import chip.clusters as Clusters from chip import ChipDeviceCtrl from chip.interaction_model import Status +from chip.testing.apps import AppServerSubprocess from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts -from TC_MCORE_FS_1_1 import AppServer class TC_MCORE_FS_1_3(MatterBaseTest): @@ -84,13 +84,15 @@ def setup_class(self): self.th_server_passcode = 20202021 # Start the TH_SERVER_NO_UID app. - self.th_server = AppServer( + self.th_server = AppServerSubprocess( th_server_app, storage_dir=self.storage.name, port=self.th_server_port, discriminator=self.th_server_discriminator, passcode=self.th_server_passcode) - self.th_server.start() + self.th_server.start( + expected_output="Server initialization complete", + timeout=30) def teardown_class(self): if self.th_server is not None: diff --git a/src/python_testing/TC_MCORE_FS_1_4.py b/src/python_testing/TC_MCORE_FS_1_4.py index c365b4e9b92b51..90d1960649d766 100644 --- a/src/python_testing/TC_MCORE_FS_1_4.py +++ b/src/python_testing/TC_MCORE_FS_1_4.py @@ -49,10 +49,10 @@ import chip.clusters as Clusters from chip import ChipDeviceCtrl from chip.interaction_model import Status +from chip.testing.apps import AppServerSubprocess from chip.testing.tasks import Subprocess from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts -from TC_MCORE_FS_1_1 import AppServer class FabricSyncApp(Subprocess): @@ -160,9 +160,11 @@ def setup_class(self): vendor_id=0xFFF1) self.th_fsa_controller.start() - # Get the named pipe path for the DUT_FSA app input from the user params. - dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe", None) - if dut_fsa_stdin_pipe is not None: + if self.is_pics_sdk_ci_only: + # Get the named pipe path for the DUT_FSA app input from the user params. + dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe") + if not dut_fsa_stdin_pipe: + asserts.fail("CI setup requires --string-arg dut_fsa_stdin_pipe:") self.dut_fsa_stdin = open(dut_fsa_stdin_pipe, "w") self.th_server_port = 5544 @@ -170,13 +172,15 @@ def setup_class(self): self.th_server_passcode = 20202022 # Start the TH_SERVER_NO_UID app. - self.th_server = AppServer( + self.th_server = AppServerSubprocess( th_server_app, storage_dir=self.storage.name, port=self.th_server_port, discriminator=self.th_server_discriminator, passcode=self.th_server_passcode) - self.th_server.start() + self.th_server.start( + expected_output="Server initialization complete", + timeout=30) def teardown_class(self): if self.th_fsa_controller is not None: diff --git a/src/python_testing/TC_MCORE_FS_1_5.py b/src/python_testing/TC_MCORE_FS_1_5.py index d4f408a2ce307f..9b7e32b1d92d5c 100755 --- a/src/python_testing/TC_MCORE_FS_1_5.py +++ b/src/python_testing/TC_MCORE_FS_1_5.py @@ -50,10 +50,10 @@ import chip.clusters as Clusters from chip import ChipDeviceCtrl +from chip.testing.apps import AppServerSubprocess from ecdsa.curves import NIST256p from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main, type_matches from mobly import asserts -from TC_MCORE_FS_1_1 import AppServer from TC_SC_3_6 import AttributeChangeAccumulator # Length of `w0s` and `w1s` elements @@ -98,9 +98,11 @@ async def setup_class(self): self.storage = tempfile.TemporaryDirectory(prefix=self.__class__.__name__) logging.info("Temporary storage directory: %s", self.storage.name) - # Get the named pipe path for the DUT_FSA app input from the user params. - dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe", None) - if dut_fsa_stdin_pipe is not None: + if self.is_pics_sdk_ci_only: + # Get the named pipe path for the DUT_FSA app input from the user params. + dut_fsa_stdin_pipe = self.user_params.get("dut_fsa_stdin_pipe") + if not dut_fsa_stdin_pipe: + asserts.fail("CI setup requires --string-arg dut_fsa_stdin_pipe:") self.dut_fsa_stdin = open(dut_fsa_stdin_pipe, "w") self.th_server_port = th_server_port @@ -112,13 +114,15 @@ async def setup_class(self): passcode=20202021) # Start the TH_SERVER app. - self.th_server = AppServer( + self.th_server = AppServerSubprocess( th_server_app, storage_dir=self.storage.name, port=self.th_server_port, discriminator=self.th_server_setup_params.discriminator, passcode=self.th_server_setup_params.passcode) - self.th_server.start() + self.th_server.start( + expected_output="Server initialization complete", + timeout=30) def teardown_class(self): if self._partslist_subscription is not None: @@ -139,7 +143,7 @@ def _ask_for_vendor_commissioning_ux_operation(self, setup_params: _SetupParamet f"- discriminator: {setup_params.discriminator}\n" f"- setupPinCode: {setup_params.passcode}\n" f"- setupQRCode: {setup_params.setup_qr_code}\n" - f"- setupManualcode: {setup_params.manual_code}\n" + f"- setupManualCode: {setup_params.manual_code}\n" f"If using FabricSync Admin test app, you may type:\n" f">>> pairing onnetwork 111 {setup_params.passcode}") diff --git a/src/python_testing/execute_python_tests.py b/src/python_testing/execute_python_tests.py index f316d49e255dba..4e678211c5ddce 100644 --- a/src/python_testing/execute_python_tests.py +++ b/src/python_testing/execute_python_tests.py @@ -66,7 +66,6 @@ def main(search_directory, env_file): "TC_TMP_2_1.py", # src/python_testing/test_testing/test_TC_TMP_2_1.py is the Unit test of this test "TC_OCC_3_1.py", # There are CI issues for the test cases that implements manually controlling sensor device for the occupancy state ON/OFF change "TC_OCC_3_2.py", # There are CI issues for the test cases that implements manually controlling sensor device for the occupancy state ON/OFF change - "TC_BRBINFO_4_1.py", # This test requires a TH_ICD_SERVER application, hence not ready run with CI "TestCommissioningTimeSync.py", # Code/Test not being used or not shared code for any other tests "TestConformanceSupport.py", # Unit test - does not run against an app "TestChoiceConformanceSupport.py", # Unit test - does not run against an app diff --git a/src/python_testing/matter_testing_infrastructure/BUILD.gn b/src/python_testing/matter_testing_infrastructure/BUILD.gn index c8d54fb0084c92..41bbcef22b8c2e 100644 --- a/src/python_testing/matter_testing_infrastructure/BUILD.gn +++ b/src/python_testing/matter_testing_infrastructure/BUILD.gn @@ -30,6 +30,7 @@ pw_python_package("chip-testing") { sources = [ "chip/testing/__init__.py", + "chip/testing/apps.py", "chip/testing/metadata.py", "chip/testing/tasks.py", ] diff --git a/src/python_testing/matter_testing_infrastructure/chip/testing/apps.py b/src/python_testing/matter_testing_infrastructure/chip/testing/apps.py new file mode 100644 index 00000000000000..af56efc3d58ff5 --- /dev/null +++ b/src/python_testing/matter_testing_infrastructure/chip/testing/apps.py @@ -0,0 +1,69 @@ +# Copyright (c) 2024 Project CHIP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import signal +import tempfile + +from .tasks import Subprocess + + +class AppServerSubprocess(Subprocess): + """Wrapper class for starting an application server in a subprocess.""" + + # Prefix for log messages from the application server. + PREFIX = b"[SERVER]" + + def __init__(self, app: str, storage_dir: str, discriminator: int, + passcode: int, port: int = 5540): + self.kvs_fd, kvs_path = tempfile.mkstemp(dir=storage_dir, prefix="kvs-app-") + # Start the server application with dedicated KVS storage. + super().__init__(app, "--KVS", kvs_path, + '--secured-device-port', str(port), + "--discriminator", str(discriminator), + "--passcode", str(passcode), + output_cb=lambda line, is_stderr: self.PREFIX + line) + + def __del__(self): + # Do not leak KVS file descriptor. + os.close(self.kvs_fd) + + +class IcdAppServerSubprocess(AppServerSubprocess): + """Wrapper class for starting an ICD application server in a subprocess.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.paused = False + + def pause(self, check_state: bool = True): + if check_state and self.paused: + raise ValueError("ICD TH Server unexpectedly is already paused") + if not self.paused: + # Stop (halt) the ICD server process by sending a SIGTOP signal. + self.p.send_signal(signal.SIGSTOP) + self.paused = True + + def resume(self, check_state: bool = True): + if check_state and not self.paused: + raise ValueError("ICD TH Server unexpectedly is already running") + if self.paused: + # Resume (continue) the ICD server process by sending a SIGCONT signal. + self.p.send_signal(signal.SIGCONT) + self.paused = False + + def terminate(self): + # Make sure the ICD server process is not paused before terminating it. + self.resume(check_state=False) + super().terminate() diff --git a/src/python_testing/matter_testing_infrastructure/chip/testing/test_tasks.py b/src/python_testing/matter_testing_infrastructure/chip/testing/test_tasks.py index 5e91a89cf68581..051d571d79086c 100644 --- a/src/python_testing/matter_testing_infrastructure/chip/testing/test_tasks.py +++ b/src/python_testing/matter_testing_infrastructure/chip/testing/test_tasks.py @@ -14,6 +14,10 @@ import unittest +# TODO: Allow to use relative imports or imports from chip.testing package. Then, +# rename "tasks" module to "subprocess", because it would be more descriptive. +# Unfortunately, current way of importing clashes with the subprocess module +# from the Python standard library. from tasks import Subprocess