Skip to content

Commit

Permalink
Python testing: Add PICS, helpers, and tests (#26399)
Browse files Browse the repository at this point in the history
* Python testing: Add PICS, helpers, and tests

* Restyled by isort

* Fixes from review, add a couple of tests

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>

---------

Co-authored-by: Restyled.io <commits@restyled.io>
Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>
  • Loading branch information
3 people authored and pull[bot] committed Sep 29, 2023
1 parent 2253659 commit 5419536
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ jobs:
scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --int-arg PIXIT.ACE.APPENDPOINT:1 PIXIT.ACE.APPDEVTYPEID:0x0100 --string-arg PIXIT.ACE.APPCLUSTER:OnOff PIXIT.ACE.APPATTRIBUTE:OnOff"'
scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"'
scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_CGEN_2_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"'
scripts/run_in_build_env.sh './scripts/tests/run_python_test.py --script "src/python_testing/TestMatterTestingSupport.py"'
- name: Uploading core files
uses: actions/upload-artifact@v3
if: ${{ failure() && !env.ACT }}
Expand Down
162 changes: 162 additions & 0 deletions src/python_testing/TestMatterTestingSupport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import typing
from datetime import datetime, timedelta, timezone

import chip.clusters as Clusters
from chip.clusters.Types import Nullable, NullValue
from chip.tlv import uint
from matter_testing_support import (MatterBaseTest, async_test_body, default_matter_test_main, parse_pics, type_matches,
utc_time_in_matter_epoch)
from mobly import asserts


def get_raw_type_list():
test = Clusters.UnitTesting
struct = test.Structs.SimpleStruct()
struct_type = test.Structs.SimpleStruct
null_opt_struct = test.Structs.NullablesAndOptionalsStruct()
null_opt_struct_type = test.Structs.NullablesAndOptionalsStruct
double_nested_struct_list = test.Structs.DoubleNestedStructList()
double_nested_struct_list_type = test.Structs.DoubleNestedStructList
list_of_uints = [0, 1]
list_of_uints_type = typing.List[uint]
list_of_structs = [struct, struct]
list_of_structs_type = typing.List[struct_type]
list_of_double_nested_struct_list = [double_nested_struct_list, double_nested_struct_list]
list_of_double_nested_struct_list_type = typing.List[double_nested_struct_list_type]

# Create a list with all the types and a list of the values that should match for that type
vals = {uint: [1],
str: ["str"],
struct_type: [struct],
null_opt_struct_type: [null_opt_struct],
double_nested_struct_list_type: [double_nested_struct_list],
list_of_uints_type: [list_of_uints],
list_of_structs_type: [list_of_structs],
list_of_double_nested_struct_list_type: [list_of_double_nested_struct_list]}
return vals


def test_type_matching_for_type(test_type, test_nullable: bool = False, test_optional: bool = False):
vals = get_raw_type_list()

if test_nullable and test_optional:
match_type = typing.Union[Nullable, None, test_type]
elif test_nullable:
match_type = typing.Union[Nullable, test_type]
elif test_optional:
match_type = typing.Optional[test_type]
else:
match_type = test_type

true_list = vals[test_type]
if test_nullable:
true_list.append(NullValue)
if test_optional:
true_list.append(None)

del vals[test_type]

# true_list is all the values that should match with the test type
for i in true_list:
asserts.assert_true(type_matches(i, match_type), "{} type checking failure".format(test_type))

# try every value in every type in the remaining dict - they should all fail
for v in vals.values():
for i in v:
asserts.assert_false(type_matches(i, match_type), "{} falsely matched to type {}".format(i, match_type))

# Test the nullables or optionals that aren't supposed to work
if not test_nullable:
asserts.assert_false(type_matches(NullValue, match_type), "NullValue falsely matched to {}".format(match_type))

if not test_optional:
asserts.assert_false(type_matches(None, match_type), "None falsely matched to {}".format(match_type))


def run_all_match_tests_for_type(test_type):
test_type_matching_for_type(test_type=test_type)
test_type_matching_for_type(test_type=test_type, test_nullable=True)
test_type_matching_for_type(test_type=test_type, test_optional=True)
test_type_matching_for_type(test_type=test_type, test_nullable=True, test_optional=True)


class TestMatterTestingSupport(MatterBaseTest):
@async_test_body
async def test_matter_epoch_time(self):
# Matter epoch should return zero
ret = utc_time_in_matter_epoch(datetime(2000, 1, 1, 0, 0, 0, 0, timezone.utc))
asserts.assert_equal(ret, 0, "UTC epoch returned non-zero value")

# Jan 2 is exactly 1 day after Jan 1
ret = utc_time_in_matter_epoch(datetime(2000, 1, 2, 0, 0, 0, 0, timezone.utc))
expected_delay = timedelta(days=1)
actual_delay = timedelta(microseconds=ret)
asserts.assert_equal(expected_delay, actual_delay, "Calculation for Jan 2 date is incorrect")

# There's a catch 22 for knowing the current time, but we can check that it's
# going up, and that it's larger than when I wrote the test
# Check that the returned value is larger than the test writing date
writing_date = utc_time_in_matter_epoch(datetime(2023, 5, 5, 0, 0, 0, 0, timezone.utc))
current_date = utc_time_in_matter_epoch()
asserts.assert_greater(current_date, writing_date, "Calculation for current date is smaller than writing date")

# Check that the time is going up
last_date = current_date
current_date = utc_time_in_matter_epoch()
asserts.assert_greater(current_date, last_date, "Time does not appear to be incrementing")

@async_test_body
async def test_type_checking(self):
vals = get_raw_type_list()
for k in vals.keys():
run_all_match_tests_for_type(k)

@async_test_body
async def test_pics_support(self):
pics_list = ['TEST.S.A0000=1',
'TEST.S.A0001=0',
'lower.s.a0000=1',
'',
' ',
'# comment',
' # comment',
' SPACE.S.A0000 = 1']
pics = parse_pics(pics_list)
# force the parsed pics here to be in the config so we can check the check_pics function
self.matter_test_config.pics = pics

asserts.assert_true(self.check_pics("TEST.S.A0000"), "PICS parsed incorrectly for TEST.S.A0000")
asserts.assert_false(self.check_pics("TEST.S.A0001"), "PICS parsed incorrectly for TEST.S.A0001")
asserts.assert_true(self.check_pics("LOWER.S.A0000"), "PICS pased incorrectly for LOWER.S.A0000")
asserts.assert_true(self.check_pics("SPACE.S.A0000"), "PICS parsed incorrectly for SPACE.S.A0000")
asserts.assert_false(self.check_pics("NOT.S.A0000"), "PICS parsed incorrectly for NOT.S.A0000")
asserts.assert_true(self.check_pics(" test.s.a0000"), "PICS checker lowercase handled incorrectly")

# invalid pics file should throw a value error
pics_list.append("BAD.S.A000=5")
try:
pics = parse_pics(pics_list)
asserts.assert_false(True, "PICS parser did not throw an error as expected")
except ValueError:
pass


if __name__ == "__main__":
default_matter_test_main()
90 changes: 89 additions & 1 deletion src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import pathlib
import re
import sys
import typing
import uuid
from binascii import hexlify, unhexlify
from dataclasses import asdict as dataclass_asdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional, Tuple

from chip.tlv import float32, uint

# isort: off

from chip import ChipDeviceCtrl # Needed before chip.FabricAdmin
Expand Down Expand Up @@ -117,6 +121,68 @@ def get_default_paa_trust_store(root_path: pathlib.Path) -> pathlib.Path:
return pathlib.Path.cwd()


def parse_pics(lines=typing.List[str]) -> dict[str, bool]:
pics = {}
for raw in lines:
line, _, _ = raw.partition("#")
line = line.strip()

if not line:
continue

key, _, val = line.partition("=")
val = val.strip()
if val not in ["1", "0"]:
raise ValueError('PICS {} must have a value of 0 or 1'.format(key))

pics[key.strip().upper()] = (val == "1")
return pics


def read_pics_from_file(filename: str) -> dict[str, bool]:
""" Reads a dictionary of PICS from a file. """
with open(filename, 'r') as f:
lines = f.readlines()
return parse_pics(lines)


def type_matches(received_value, desired_type):
""" Checks if the value received matches the expected type.
Handles unpacking Nullable and Optional types and
compares list value types for non-empty lists.
"""
if typing.get_origin(desired_type) == typing.Union:
return any(type_matches(received_value, t) for t in typing.get_args(desired_type))
elif typing.get_origin(desired_type) == list:
if isinstance(received_value, list):
# Assume an empty list is of the correct type
return True if received_value == [] else any(type_matches(received_value[0], t) for t in typing.get_args(desired_type))
else:
return False
elif desired_type == uint:
return isinstance(received_value, int) and received_value >= 0
elif desired_type == float32:
return isinstance(received_value, float)
else:
return isinstance(received_value, desired_type)


def utc_time_in_matter_epoch(desired_datetime: datetime = None):
""" Returns the time in matter epoch in us.
If desired_datetime is None, it will return the current time.
"""
if desired_datetime is None:
utc_native = datetime.now(tz=timezone.utc)
else:
utc_native = desired_datetime
# Matter epoch is 0 hours, 0 minutes, 0 seconds on Jan 1, 2000 UTC
utc_th_delta = utc_native - datetime(2000, 1, 1, 0, 0, 0, 0, timezone.utc)
utc_th_us = int(utc_th_delta.total_seconds() * 1000000)
return utc_th_us


@dataclass
class MatterTestConfig:
storage_path: pathlib.Path = None
Expand Down Expand Up @@ -266,14 +332,19 @@ def certificate_authority_manager(self) -> chip.CertificateAuthority.Certificate
def dut_node_id(self) -> int:
return self.matter_test_config.dut_node_id[0]

def check_pics(self, pics_key: str) -> bool:
picsd = self.matter_test_config.pics
pics_key = pics_key.strip().upper()
return pics_key in picsd and picsd[pics_key]

async def read_single_attribute(
self, dev_ctrl: ChipDeviceCtrl, node_id: int, endpoint: int, attribute: object, fabricFiltered: bool = True) -> object:
result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)], fabricFiltered=fabricFiltered)
data = result[endpoint]
return list(data.values())[0][attribute]

async def read_single_attribute_check_success(
self, cluster: object, attribute: object,
self, cluster: Clusters.ClusterObjects.ClusterCommand, attribute: Clusters.ClusterObjects.ClusterAttributeDescriptor,
dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object:
if dev_ctrl is None:
dev_ctrl = self.default_controller
Expand All @@ -285,6 +356,9 @@ async def read_single_attribute_check_success(
err_msg = "Error reading {}:{}".format(str(cluster), str(attribute))
asserts.assert_true(attr_ret is not None, err_msg)
asserts.assert_false(isinstance(attr_ret, Clusters.Attribute.ValueDecodeFailure), err_msg)
desired_type = attribute.attribute_type.Type
asserts.assert_true(type_matches(attr_ret, desired_type),
'Returned attribute {} is wrong type expected {}, got {}'.format(attribute, desired_type, type(attr_ret)))
return attr_ret

async def read_single_attribute_expect_error(
Expand All @@ -304,6 +378,18 @@ async def read_single_attribute_expect_error(
asserts.assert_equal(attr_ret.Reason.status, error, err_msg)
return attr_ret

async def send_single_cmd(
self, cmd: Clusters.ClusterObjects.ClusterCommand,
dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0,
timedRequestTimeoutMs: typing.Union[None, int] = None) -> object:
if dev_ctrl is None:
dev_ctrl = self.default_controller
if node_id is None:
node_id = self.dut_node_id

result = await dev_ctrl.SendCommand(nodeid=node_id, endpoint=endpoint, payload=cmd, timedRequestTimeoutMs=timedRequestTimeoutMs)
return result

def print_step(self, stepnum: int, title: str) -> None:
logging.info('***** Test Step %d : %s', stepnum, title)

Expand Down Expand Up @@ -567,6 +653,7 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig:
config.logs_path = pathlib.Path(_DEFAULT_LOG_PATH) if args.logs_path is None else args.logs_path
config.paa_trust_store_path = args.paa_trust_store_path
config.ble_interface_id = args.ble_interface_id
config.pics = {} if args.PICS is None else read_pics_from_file(args.PICS)

config.controller_node_id = args.controller_node_id

Expand Down Expand Up @@ -616,6 +703,7 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig:
metavar='NODE_ID', default=[_DEFAULT_DUT_NODE_ID],
help='Node ID for primary DUT communication, '
'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID, nargs="+")
basic_group.add_argument("--PICS", help="PICS file path", type=str)

commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node")

Expand Down

0 comments on commit 5419536

Please sign in to comment.