Skip to content

Commit

Permalink
Fix test TC-SWTCH-2.2
Browse files Browse the repository at this point in the history
- TC-SWTCH-2.2 is updated from YAML to Python by this PR
- TC-SWTCH-2.2 latest test plan steps implemented
- Added latching switch simulation to all clusters app
- All switch cluster simulation features are now done in all-clusters-app

Fixes project-chip#34304
Fixes project-chip#34305

Testing done:
- New test passes with all-clusters
- Existing tests also pass
  • Loading branch information
tcarmelveilleux committed Jul 31, 2024
1 parent 830d5fc commit 0e61d3f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 42 deletions.
13 changes: 7 additions & 6 deletions examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,17 @@ void HandleSimulateMultiPress(Json::Value & jsonValue)
* Named pipe handler for simulating a latched switch movement.
*
* Usage example:
* echo '{"Name": "SimulateLatchedPosition", "EndpointId": 3, "PositionId": 1}' > /tmp/chip_all_clusters_fifo_1146610
* echo '{"Name": "SimulateLatchPosition", "EndpointId": 3, "PositionId": 1}' > /tmp/chip_all_clusters_fifo_1146610
*
* JSON Arguments:
* - "Name": Must be "SimulateLatchedPosition"
* - "Name": Must be "SimulateLatchPosition"
* - "EndpointId": ID of endpoint having a switch cluster
* - "PositionId": switch position for new CurrentPosition to set in switch cluster
*
* @param jsonValue - JSON payload from named pipe
*/

void HandleSimulateLatchedPosition(Json::Value & jsonValue)
void HandleSimulateLatchPosition(Json::Value & jsonValue)
{
bool hasEndpointId = HasNumericField(jsonValue, "EndpointId");
bool hasPositionId = HasNumericField(jsonValue, "PositionId");
Expand All @@ -234,7 +234,7 @@ void HandleSimulateLatchedPosition(Json::Value & jsonValue)

if (positionId != previousPositionId)
{
status = Switch::Attributes::CurrentPosition::Set(endpoint, positionId);
status = Switch::Attributes::CurrentPosition::Set(endpointId, positionId);
VerifyOrReturn(Protocols::InteractionModel::Status::Success == status,
ChipLogError(NotSpecified, "Failed to set CurrentPosition attribute"));
ChipLogDetail(NotSpecified, "The latching switch is moved to a new position: %u", static_cast<unsigned>(positionId));
Expand Down Expand Up @@ -406,11 +406,12 @@ void AllClustersAppCommandHandler::HandleCommand(intptr_t context)
}
else if (name == "SimulateLatchPosition")
{
HandleSimulateLatchedPosition(self->mJsonValue);
HandleSimulateLatchPosition(self->mJsonValue);
}
else
{
ChipLogError(NotSpecified, "Unhandled command: Should never happens");
ChipLogError(NotSpecified, "Unhandled command '%s': this hould never happen", name.c_str());
VerifyOrDie(false && "Named pipe command not supported, see log above.");
}

exit:
Expand Down
60 changes: 32 additions & 28 deletions src/python_testing/TC_SWTCH.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ def _send_long_press_named_pipe_command(self, endpoint_id: int, pressed_position
"ButtonId": pressed_position, "LongPressDelayMillis": 5000, "LongPressDurationMillis": 5500, "FeatureMap": feature_map}
self._send_named_pipe_command(command_dict)

def _send_latched_switch_named_pipe_command(self, endpoint_id: int, new_position: int):
command_dict = {"Name": "SimulateLatchedPosition", "EndpointId": endpoint_id, "PositionId": new_position}
def _send_latching_switch_named_pipe_command(self, endpoint_id: int, new_position: int):
command_dict = {"Name": "SimulateLatchPosition", "EndpointId": endpoint_id, "PositionId": new_position}
self._send_named_pipe_command(command_dict)

def _ask_for_switch_idle(self):
Expand All @@ -102,7 +102,7 @@ def _ask_for_switch_position(self, endpoint_id: int, new_position: int):
if not self._use_button_simulator():
self.wait_for_user_input(prompt_msg=f"Move latched switch to position {new_position}, if it is not already there.")
else:
self._send_latched_switch_named_pipe_command(endpoint_id, new_position)
self._send_latching_switch_named_pipe_command(endpoint_id, new_position)

def _ask_for_multi_press_short_long(self, endpoint_id: int, pressed_position: int, feature_map: uint, multi_press_max: uint):
if not self._use_button_simulator():
Expand Down Expand Up @@ -387,68 +387,72 @@ def steps_TC_SWTCH_2_2(self):

@per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kLatchingSwitch))
async def test_TC_SWTCH_2_2(self):
# Commissioning - already done
post_prompt_settle_delay_seconds = 10.0

# Step 1: Commissioning - already done
self.step(1)

cluster = Clusters.Switch
feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap)

has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0
has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0
has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0

endpoint_id = self.matter_test_config.endpoint

# Step 2: Set up subscription to all events of Switch cluster on the endpoint.
self.step(2)
event_listener = EventChangeCallback(cluster)
await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)

# Step 3: Operator sets switch to first position on the DUT.
self.step(3)
self._ask_for_switch_position(endpoint_id, new_position=0)
event_listener.flush_events()

# Step 4: TH reads the CurrentPosition attribute from the DUT.
# Verify that the value is 0.
self.step(4)
button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, 0, "Switch position value is not 0")

# Step 5: Operator sets switch to second position (one) on the DUT",
# Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT
self.step(5)
expected_switch_position = 1
self._ask_for_switch_position(endpoint_id, expected_switch_position)
data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched)
# TODO: Implement check
print(data)

data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds)
logging.info(f"-> SwitchLatched event last received: {data}")
asserts.assert_equal(data, cluster.Events.SwitchLatched(newPosition=expected_switch_position), "Did not get expected switch position")

# Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1
self.step(6)
button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}")

# Step 7: If there are more than 2 positions, test subsequent positions of the DUT
# TODO: Implement loop for > 2 total positions
self.skip_step(7)

# Step 8: Operator sets switch to first position on the DUT.
self.step(8)
event_listener.flush_events()
self._ask_for_switch_position(endpoint_id, new_position=0)

# Step 9: Wait 10 seconds for event reports stable.
# Verify that last SwitchLatched event received is for NewPosition 0.
self.step(9)
# Wait 10 seconds, then check last SwitchLatched event received had position 0
time.sleep(10.0)

expected_switch_position = 0
last_event = event_listener.get_last_event()
asserts.assert_is_not_none(last_event, "Did not get SwitchLatched events since last operator action.")
last_event_data = last_event.Data
logging.info(f"-> SwitchLatched event last received: {last_event_data}")
asserts.assert_equal(last_event_data, cluster.Events.SwitchLatched(newPosition=expected_switch_position), "Did not get expected switch position")


self.step("8b")
if has_msr_feature and has_msl_feature:
asserts.assert_true(self._received_event(event_listener, cluster.Events.LongRelease, 10),
"Did not receive long release")

self.step("8c")
if has_as_feature:
asserts.assert_false(self._received_event(event_listener, cluster.Events.ShortRelease, 10), "Received short release")
else:
self.mark_current_step_skipped()

self.step(9)
# Step 10: TH reads the CurrentPosition attribute from the DUT.
# Verify that the value is 0
self.step(10)
button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, 0, "Button value is not 0")


def steps_TC_SWTCH_2_3(self):
return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True),
TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"),
Expand Down
30 changes: 22 additions & 8 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,28 +257,42 @@ def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction):
f'Got subscription report for event on cluster {self._expected_cluster}: {res.Data}')
self._q.put(res)

def wait_for_event_report(self, expected_event: ClusterObjects.ClusterEvent, timeoutS: int = 10):
"""This function allows a test script to block waiting for the specific event to arrive with a timeout
(specified in seconds). It returns the event data so that the values can be checked."""
def wait_for_event_report(self, expected_event: ClusterObjects.ClusterEvent, timeout_sec: float = 10.0) -> Any:
"""This function allows a test script to block waiting for the specific event to be the next event
to arrive within a timeout (specified in seconds). It returns the event data so that the values can be checked."""
try:
res = self._q.get(block=True, timeout=timeoutS)
res = self._q.get(block=True, timeout=timeout_sec)
except queue.Empty:
asserts.fail("Failed to receive a report for the event {}".format(expected_event))

asserts.assert_equal(res.Header.ClusterId, expected_event.cluster_id, "Expected cluster ID not found in event report")
asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report")
return res.Data

def wait_for_event_expect_no_report(self, timeoutS: int = 10):
"""This function succceeds/returns if an event does not arrive within the timeout specified in seconds.
If an event does arrive, an assert is called."""
def wait_for_event_expect_no_report(self, timeout_sec: float = 10.0):
"""This function returns if an event does not arrive within the timeout specified in seconds.
If any event does arrive, an assert failure occurs."""
try:
res = self._q.get(block=True, timeout=timeoutS)
res = self._q.get(block=True, timeout=timeout_sec)
except queue.Empty:
return

asserts.fail(f"Event reported when not expected {res}")

def get_last_event(self) -> Optional[Any]:
"""Flush entire queue, returning last (newest) event only."""
last_event: Optional[Any] = None
while True:
try:
last_event = self._q.get(block=False)
except queue.Empty:
return last_event

def flush_events(self) -> None:
"""Flush entire queue, returning nothing."""
_ = self.get_last_event()
return

@property
def event_queue(self) -> queue.Queue:
return self._q
Expand Down

0 comments on commit 0e61d3f

Please sign in to comment.