diff --git a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp index 45ab1dac61bebc..d620731e93dc5c 100644 --- a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp +++ b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp @@ -64,7 +64,7 @@ bool HasNumericField(Json::Value & jsonValue, const std::string & field) * * JSON Arguments: * - "Name": Must be "SimulateLongPress" - * - "EndpointId": number of endpoint having a switch cluster + * - "EndpointId": ID of endpoint having a switch cluster * - "ButtonId": switch position in the switch cluster for "down" button (not idle) * - "LongPressDelayMillis": Time in milliseconds before the LongPress * - "LongPressDurationMillis": Total duration in milliseconds from start of the press to LongRelease @@ -129,7 +129,7 @@ void HandleSimulateLongPress(Json::Value & jsonValue) * * JSON Arguments: * - "Name": Must be "SimulateActionSwitchMultiPress" - * - "EndpointId": number of endpoint having a switch cluster + * - "EndpointId": ID of endpoint having a switch cluster * - "ButtonId": switch position in the switch cluster for "down" button (not idle) * - "MultiPressPressedTimeMillis": Pressed time in milliseconds for each press * - "MultiPressReleasedTimeMillis": Released time in milliseconds after each press @@ -196,6 +196,56 @@ void HandleSimulateMultiPress(Json::Value & jsonValue) sButtonSimulatorInstance = std::move(buttonSimulator); } +/** + * Named pipe handler for simulating a latched switch movement. + * + * Usage example: + * echo '{"Name": "SimulateLatchPosition", "EndpointId": 3, "PositionId": 1}' > /tmp/chip_all_clusters_fifo_1146610 + * + * JSON Arguments: + * - "Name": Must be "SimulateLatchPosition" + * - "EndpointId": ID of endpoint having a switch cluster + * - "PositionId": switch position for new CurrentPosition to set in switch cluster + * + * @param jsonValue - JSON payload from named pipe + */ + +void HandleSimulateLatchPosition(Json::Value & jsonValue) +{ + bool hasEndpointId = HasNumericField(jsonValue, "EndpointId"); + bool hasPositionId = HasNumericField(jsonValue, "PositionId"); + + if (!hasEndpointId || !hasPositionId) + { + std::string inputJson = jsonValue.toStyledString(); + ChipLogError(NotSpecified, "Missing or invalid value for one of EndpointId, PositionId in %s", inputJson.c_str()); + return; + } + + EndpointId endpointId = static_cast(jsonValue["EndpointId"].asUInt()); + uint8_t positionId = static_cast(jsonValue["PositionId"].asUInt()); + + uint8_t previousPositionId = 0; + Protocols::InteractionModel::Status status = Switch::Attributes::CurrentPosition::Get(endpointId, &previousPositionId); + VerifyOrReturn(Protocols::InteractionModel::Status::Success == status, + ChipLogError(NotSpecified, "Failed to get CurrentPosition attribute")); + + if (positionId != previousPositionId) + { + status = Switch::Attributes::CurrentPosition::Set(endpointId, positionId); + VerifyOrReturn(Protocols::InteractionModel::Status::Success == status, + ChipLogError(NotSpecified, "Failed to set CurrentPosition attribute")); + ChipLogDetail(NotSpecified, "The latching switch is moved to a new position: %u", static_cast(positionId)); + + Clusters::SwitchServer::Instance().OnSwitchLatch(endpointId, positionId); + } + else + { + ChipLogDetail(NotSpecified, "Not moving latching switch to a new position, already at %u", + static_cast(positionId)); + } +} + } // namespace AllClustersAppCommandHandler * AllClustersAppCommandHandler::FromJSON(const char * json) @@ -353,9 +403,14 @@ void AllClustersAppCommandHandler::HandleCommand(intptr_t context) { HandleSimulateMultiPress(self->mJsonValue); } + else if (name == "SimulateLatchPosition") + { + HandleSimulateLatchPosition(self->mJsonValue); + } else { - ChipLogError(NotSpecified, "Unhandled command: Should never happens"); + ChipLogError(NotSpecified, "Unhandled command '%s': this hould never happen", name.c_str()); + VerifyOrDie(false && "Named pipe command not supported, see log above."); } exit: diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index 926379804eb8e3..6da5c7a229e0b7 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -53,6 +53,7 @@ def desc_TC_SWTCH_2_4(self) -> str: """Returns a description of this test""" return "[TC-SWTCH-2.4] Momentary Switch Long Press Verification" + # TODO(#34656): Fill test steps # def steps_TC_SWTCH_2_4(self) -> list[TestStep]: # steps = [ # TestStep("0", "Commissioning, already done", is_commissioning=True), @@ -79,10 +80,6 @@ def _send_named_pipe_command(self, command_dict: dict[str, Any]): def _use_button_simulator(self) -> bool: return self.check_pics("PICS_SDK_CI_ONLY") or self.user_params.get("use_button_simulator", False) - def _ask_for_switch_idle(self): - if not self._use_button_simulator(): - self.wait_for_user_input(prompt_msg="Ensure switch is idle") - def _send_multi_press_named_pipe_command(self, endpoint_id: int, number_of_presses: int, pressed_position: int, feature_map: uint, multi_press_max: uint): command_dict = {"Name": 'SimulateMultiPress', "EndpointId": endpoint_id, "ButtonId": pressed_position, "MultiPressPressedTimeMillis": 500, "MultiPressReleasedTimeMillis": 500, @@ -94,6 +91,20 @@ def _send_long_press_named_pipe_command(self, endpoint_id: int, pressed_position "ButtonId": pressed_position, "LongPressDelayMillis": 5000, "LongPressDurationMillis": 5500, "FeatureMap": feature_map} self._send_named_pipe_command(command_dict) + def _send_latching_switch_named_pipe_command(self, endpoint_id: int, new_position: int): + command_dict = {"Name": "SimulateLatchPosition", "EndpointId": endpoint_id, "PositionId": new_position} + self._send_named_pipe_command(command_dict) + + def _ask_for_switch_idle(self): + if not self._use_button_simulator(): + self.wait_for_user_input(prompt_msg="Ensure switch is idle") + + def _ask_for_switch_position(self, endpoint_id: int, new_position: int): + if not self._use_button_simulator(): + self.wait_for_user_input(prompt_msg=f"Move latched switch to position {new_position}, if it is not already there.") + else: + self._send_latching_switch_named_pipe_command(endpoint_id, new_position) + def _ask_for_multi_press_short_long(self, endpoint_id: int, pressed_position: int, feature_map: uint, multi_press_max: uint): if not self._use_button_simulator(): msg = f""" @@ -105,7 +116,8 @@ def _ask_for_multi_press_short_long(self, endpoint_id: int, pressed_position: in self.wait_for_user_input(msg) else: # This is just a simulator, ignore the long press instruction for now, it doesn't matter for the CI. It does for cert. - self._send_multi_press_named_pipe_command(endpoint_id, 2, pressed_position, feature_map, multi_press_max) + self._send_multi_press_named_pipe_command( + endpoint_id, number_of_presses=2, pressed_position=pressed_position, feature_map=feature_map, multi_press_max=multi_press_max) def _ask_for_multi_press_long_short(self, endpoint_id, pressed_position, feature_map: int): if not self._use_button_simulator(): @@ -153,7 +165,7 @@ def _ask_for_release(self): time.sleep(self.keep_pressed_delay/1000) def _placeholder_for_step(self, step_id: str): - # TODO: Global search an replace of `self._placeholder_for_step` with `self.step` when done. + # TODO(#34656): Global search an replace of `self._placeholder_for_step` with `self.step` when done. logging.info(f"Step {step_id}") pass @@ -259,7 +271,7 @@ def _expect_no_events_for_cluster(self, event_queue: queue.Queue, endpoint_id: i @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kMomentarySwitch)) async def test_TC_SWTCH_2_4(self): - # TODO: Make this come from PIXIT + # TODO(#34656): Make this come from PIXIT switch_pressed_position = 1 post_prompt_settle_delay_seconds = 10.0 @@ -361,6 +373,90 @@ def _received_event(self, event_listener: EventChangeCallback, target_event: Clu remaining = end_time - datetime.now() return False + def steps_TC_SWTCH_2_2(self): + return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), + TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), + TestStep(3, "Operator sets switch to first position on the DUT"), + TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), + TestStep(5, "Operator sets switch to second position (one) on the DUT", + "Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT"), + TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"), + TestStep(7, "If there are more than 2 positions, test subsequent positions of the DUT"), + TestStep(8, "Operator sets switch to first position on the DUT."), + TestStep(9, "Wait 10 seconds for event reports stable." "Verify that last SwitchLatched event received is for NewPosition 0."), + TestStep(10, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), + ] + + @per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kLatchingSwitch)) + async def test_TC_SWTCH_2_2(self): + post_prompt_settle_delay_seconds = 10.0 + + # Step 1: Commissioning - already done + self.step(1) + + cluster = Clusters.Switch + endpoint_id = self.matter_test_config.endpoint + + # Step 2: Set up subscription to all events of Switch cluster on the endpoint. + self.step(2) + event_listener = EventChangeCallback(cluster) + await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) + + # Step 3: Operator sets switch to first position on the DUT. + self.step(3) + self._ask_for_switch_position(endpoint_id, new_position=0) + event_listener.flush_events() + + # Step 4: TH reads the CurrentPosition attribute from the DUT. + # Verify that the value is 0. + self.step(4) + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) + asserts.assert_equal(button_val, 0, "Switch position value is not 0") + + # Step 5: Operator sets switch to second position (one) on the DUT", + # Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT + self.step(5) + expected_switch_position = 1 + self._ask_for_switch_position(endpoint_id, expected_switch_position) + + data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds) + logging.info(f"-> SwitchLatched event last received: {data}") + asserts.assert_equal(data, cluster.Events.SwitchLatched( + newPosition=expected_switch_position), "Did not get expected switch position") + + # Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1 + self.step(6) + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) + asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}") + + # Step 7: If there are more than 2 positions, test subsequent positions of the DUT + # # TODO(#34656): Implement loop for > 2 total positions + self.skip_step(7) + + # Step 8: Operator sets switch to first position on the DUT. + self.step(8) + event_listener.flush_events() + self._ask_for_switch_position(endpoint_id, new_position=0) + + # Step 9: Wait 10 seconds for event reports stable. + # Verify that last SwitchLatched event received is for NewPosition 0. + self.step(9) + time.sleep(10.0) + + expected_switch_position = 0 + last_event = event_listener.get_last_event() + asserts.assert_is_not_none(last_event, "Did not get SwitchLatched events since last operator action.") + last_event_data = last_event.Data + logging.info(f"-> SwitchLatched event last received: {last_event_data}") + asserts.assert_equal(last_event_data, cluster.Events.SwitchLatched( + newPosition=expected_switch_position), "Did not get expected switch position") + + # Step 10: TH reads the CurrentPosition attribute from the DUT. + # Verify that the value is 0 + self.step(10) + button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) + asserts.assert_equal(button_val, 0, "Button value is not 0") + def steps_TC_SWTCH_2_3(self): return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 26437ea54b5b79..273ffe94b5f821 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -257,11 +257,11 @@ def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction): f'Got subscription report for event on cluster {self._expected_cluster}: {res.Data}') self._q.put(res) - def wait_for_event_report(self, expected_event: ClusterObjects.ClusterEvent, timeoutS: int = 10): - """This function allows a test script to block waiting for the specific event to arrive with a timeout - (specified in seconds). It returns the event data so that the values can be checked.""" + def wait_for_event_report(self, expected_event: ClusterObjects.ClusterEvent, timeout_sec: float = 10.0) -> Any: + """This function allows a test script to block waiting for the specific event to be the next event + to arrive within a timeout (specified in seconds). It returns the event data so that the values can be checked.""" try: - res = self._q.get(block=True, timeout=timeoutS) + res = self._q.get(block=True, timeout=timeout_sec) except queue.Empty: asserts.fail("Failed to receive a report for the event {}".format(expected_event)) @@ -269,16 +269,30 @@ def wait_for_event_report(self, expected_event: ClusterObjects.ClusterEvent, tim asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report") return res.Data - def wait_for_event_expect_no_report(self, timeoutS: int = 10): - """This function succceeds/returns if an event does not arrive within the timeout specified in seconds. - If an event does arrive, an assert is called.""" + def wait_for_event_expect_no_report(self, timeout_sec: float = 10.0): + """This function returns if an event does not arrive within the timeout specified in seconds. + If any event does arrive, an assert failure occurs.""" try: - res = self._q.get(block=True, timeout=timeoutS) + res = self._q.get(block=True, timeout=timeout_sec) except queue.Empty: return asserts.fail(f"Event reported when not expected {res}") + def get_last_event(self) -> Optional[Any]: + """Flush entire queue, returning last (newest) event only.""" + last_event: Optional[Any] = None + while True: + try: + last_event = self._q.get(block=False) + except queue.Empty: + return last_event + + def flush_events(self) -> None: + """Flush entire queue, returning nothing.""" + _ = self.get_last_event() + return + @property def event_queue(self) -> queue.Queue: return self._q