Skip to content

Commit

Permalink
Add testing files from controller
Browse files Browse the repository at this point in the history
  • Loading branch information
DamMicSzm committed Feb 27, 2023
1 parent 1c1221e commit e7f7d5a
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 160 deletions.
6 changes: 0 additions & 6 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,6 @@ exclude = third_party
src/controller/python/chip/yaml/__init__.py
src/controller/python/chip/yaml/format_converter.py
src/controller/python/chip/yaml/runner.py
src/controller/python/test/test_scripts/base.py
src/controller/python/test/test_scripts/cluster_objects.py
src/controller/python/test/test_scripts/mobile-device-test.py
src/controller/python/test/test_scripts/network_commissioning.py
src/controller/python/test/unit_tests/test_cluster_objects.py
src/controller/python/test/unit_tests/test_tlv.py
src/lib/asn1/gen_asn1oid.py
src/pybindings/pycontroller/build-chip-wheel.py
src/pybindings/pycontroller/pychip/__init__.py
Expand Down
199 changes: 149 additions & 50 deletions src/controller/python/test/test_scripts/base.py

Large diffs are not rendered by default.

86 changes: 59 additions & 27 deletions src/controller/python/test/test_scripts/cluster_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def VerifyDecodeSuccess(values):
f"Ignoring attribute decode failure for path {endpoint}/{attribute}")
else:
raise AssertionError(
f"Cannot decode value for path {endpoint}/{attribute}, got error: '{str(v.Reason)}', raw TLV data: '{v.TLVValue}'")
f"Cannot decode value for path {endpoint}/{attribute}, "
f"got error: '{str(v.Reason)}', raw TLV data: '{v.TLVValue}'")

for endpoint in values:
for cluster in values[endpoint]:
Expand Down Expand Up @@ -104,7 +105,7 @@ async def TestCommandRoundTripWithBadEndpoint(cls, devCtrl):
req = Clusters.OnOff.Commands.On()
try:
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=233, payload=req)
raise ValueError(f"Failure expected")
raise ValueError("Failure expected")
except chip.interaction_model.InteractionModelError as ex:
logger.info(f"Recevied {ex} from server.")
return
Expand Down Expand Up @@ -156,11 +157,16 @@ async def TestWriteRequest(cls, devCtrl):
raise AssertionError("Write returned unexpected result.")

logger.info("2: Write chunked list")
res = await devCtrl.WriteAttribute(nodeid=NODE_ID,
attributes=[(1, Clusters.UnitTesting.Attributes.ListLongOctetString([b"0123456789abcdef" * 32] * 5))])
res = await devCtrl.WriteAttribute(
nodeid=NODE_ID,
attributes=[
(1, Clusters.UnitTesting.Attributes.ListLongOctetString([b"0123456789abcdef" * 32] * 5))
]
)
expectedRes = [
AttributeStatus(Path=AttributePath(
EndpointId=1, Attribute=Clusters.UnitTesting.Attributes.ListLongOctetString), Status=chip.interaction_model.Status.Success),
EndpointId=1,
Attribute=Clusters.UnitTesting.Attributes.ListLongOctetString), Status=chip.interaction_model.Status.Success),
]

logger.info(f"Received WriteResponse: {res}")
Expand Down Expand Up @@ -198,15 +204,19 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction):
@ base.test_case
async def TestSubscribeZeroMinInterval(cls, devCtrl):
'''
This validates receiving subscription reports for two attributes at a time in quick succession after issuing a command that results in attribute side-effects.
Specifically, it relies on the fact that the second attribute is changed in a different execution context than the first. This ensures that we pick-up the first
attribute change and generate a notification, and validating that shortly after that, we generate a second report for the second change.
This is done using subscriptions with a min reporting interval of 0 to ensure timely notification of the above. An On() command is sent to the OnOff cluster
This validates receiving subscription reports for two attributes at a time in quick succession after
issuing a command that results in attribute side-effects. Specifically, it relies on the fact that the second attribute
is changed in a different execution context than the first. This ensures that we pick-up the first
attribute change and generate a notification, and validating that shortly after that,
we generate a second report for the second change.
This is done using subscriptions with a min reporting interval of 0 to ensure timely notification of the above.
An On() command is sent to the OnOff cluster
which should simultaneously set the state to On as well as set the level to 254.
'''
logger.info("Test Subscription With MinInterval of 0")
sub = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=[Clusters.OnOff, Clusters.LevelControl], reportInterval=(0, 60))
sub = await devCtrl.ReadAttribute(nodeid=NODE_ID,
attributes=[Clusters.OnOff, Clusters.LevelControl], reportInterval=(0, 60))
data = sub.GetAttributes()

logger.info("Sending off command")
Expand Down Expand Up @@ -297,17 +307,21 @@ async def TestReadAttributeRequests(cls, devCtrl):
if res[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListLongOctetString] != [b'0123456789abcdef' * 32] * 4:
raise AssertionError("Unexpected read result")

logger.info("*: Getting current fabric index")
res = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=[(0, Clusters.OperationalCredentials.Attributes.CurrentFabricIndex)])
fabricIndex = res[0][Clusters.OperationalCredentials][Clusters.OperationalCredentials.Attributes.CurrentFabricIndex]

# Note: ListFabricScoped is an empty list for now. We should re-enable this test after we make it return expected data.
# logger.info("*: Getting current fabric index")
# res = await devCtrl.ReadAttribute(nodeid=NODE_ID,
# attributes=[(0, Clusters.OperationalCredentials.Attributes.CurrentFabricIndex)])
# fabricIndex = res[0][Clusters.OperationalCredentials][Clusters.OperationalCredentials.Attributes.CurrentFabricIndex]
#
# logger.info("8: Read without fabric filter")
# res = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=[(1, Clusters.UnitTesting.Attributes.ListFabricScoped)], fabricFiltered=False)
# res = await devCtrl.ReadAttribute(nodeid=NODE_ID,
# attributes=[(1, Clusters.UnitTesting.Attributes.ListFabricScoped)],
# fabricFiltered=False)
# if len(res[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListFabricScoped]) == 1:
# raise AssertionError("Expect more elements in the response")
# logger.info("9: Read with fabric filter")
# res = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=[(1, Clusters.UnitTesting.Attributes.ListFabricScoped)], fabricFiltered=True)
# res = await devCtrl.ReadAttribute(nodeid=NODE_ID,
# attributes=[(1, Clusters.UnitTesting.Attributes.ListFabricScoped)], fabricFiltered=True)
# if len(res[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListFabricScoped]) != 1:
# raise AssertionError("Expect exact one element in the response")
# if res[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListFabricScoped][0].fabricIndex != fabricIndex:
Expand All @@ -317,9 +331,12 @@ async def TestReadAttributeRequests(cls, devCtrl):
@ classmethod
async def _TriggerEvent(cls, devCtrl):
# We trigger sending an event a couple of times just to be safe.
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=Clusters.UnitTesting.Commands.TestEmitTestEventRequest())
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=Clusters.UnitTesting.Commands.TestEmitTestEventRequest())
return await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=Clusters.UnitTesting.Commands.TestEmitTestEventRequest())
await devCtrl.SendCommand(nodeid=NODE_ID,
endpoint=1, payload=Clusters.UnitTesting.Commands.TestEmitTestEventRequest())
await devCtrl.SendCommand(nodeid=NODE_ID,
endpoint=1, payload=Clusters.UnitTesting.Commands.TestEmitTestEventRequest())
return await devCtrl.SendCommand(nodeid=NODE_ID,
endpoint=1, payload=Clusters.UnitTesting.Commands.TestEmitTestEventRequest())

@ classmethod
async def _RetryForContent(cls, request, until, retryCount=10, intervalSeconds=1):
Expand Down Expand Up @@ -351,20 +368,30 @@ def validate_got_expected_event(events):
return False
return True

await cls._RetryForContent(request=lambda: devCtrl.ReadEvent(nodeid=NODE_ID, events=req, eventNumberFilter=current_event_filter), until=validate_got_expected_event)
await cls._RetryForContent(request=lambda: devCtrl.ReadEvent(
nodeid=NODE_ID,
events=req,
eventNumberFilter=current_event_filter
), until=validate_got_expected_event)

def validate_got_no_event(events):
return len(events) == 0

await cls._RetryForContent(request=lambda: devCtrl.ReadEvent(nodeid=NODE_ID, events=req, eventNumberFilter=(current_event_filter + 1)), until=validate_got_no_event)
await cls._RetryForContent(request=lambda: devCtrl.ReadEvent(
nodeid=NODE_ID,
events=req,
eventNumberFilter=(current_event_filter + 1)
), until=validate_got_no_event)

@ classmethod
@ base.test_case
async def TestGenerateUndefinedFabricScopedEventRequests(cls, devCtrl):
logger.info("Running TestGenerateUndefinedFabricScopedEventRequests")
try:
res = await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=Clusters.UnitTesting.Commands.TestEmitTestFabricScopedEventRequest(arg1=0))
raise ValueError(f"Unexpected Failure")
res = await devCtrl.SendCommand(nodeid=NODE_ID,
endpoint=1,
payload=Clusters.UnitTesting.Commands.TestEmitTestFabricScopedEventRequest(arg1=0))
raise ValueError("Unexpected Failure")
except chip.interaction_model.InteractionModelError as ex:
logger.info(f"Recevied {ex} from server.")
res = await devCtrl.ReadEvent(nodeid=NODE_ID, events=[
Expand Down Expand Up @@ -518,13 +545,15 @@ async def TestReadWriteAttributeRequestsWithVersion(cls, devCtrl):
req = [
(0, Clusters.BasicInformation.Attributes.VendorName),
]
res = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req, dataVersionFilters=[(0, Clusters.BasicInformation, data_version)])
res = await devCtrl.ReadAttribute(nodeid=NODE_ID,
attributes=req, dataVersionFilters=[(0, Clusters.BasicInformation, data_version)])
VerifyDecodeSuccess(res)
new_data_version = res[0][Clusters.BasicInformation][DataVersion]
if (data_version + 1) != new_data_version:
raise AssertionError("Version mistmatch happens.")

res = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=req, dataVersionFilters=[(0, Clusters.BasicInformation, new_data_version)])
res = await devCtrl.ReadAttribute(nodeid=NODE_ID,
attributes=req, dataVersionFilters=[(0, Clusters.BasicInformation, new_data_version)])
VerifyDecodeSuccess(res)

res = await devCtrl.WriteAttribute(nodeid=NODE_ID,
Expand Down Expand Up @@ -590,7 +619,10 @@ def eventPathPossibilities():
logging.info(
f"{testCount}: Reading mixed Attributes({attributes[0]}) Events({events[0]})")
await cls._TriggerEvent(devCtrl)
res = await cls._RetryForContent(request=lambda: devCtrl.Read(nodeid=NODE_ID, attributes=attributes[1], events=events[1]), until=lambda res: res != 0)
res = await cls._RetryForContent(request=lambda: devCtrl.Read(
nodeid=NODE_ID,
attributes=attributes[1],
events=events[1]), until=lambda res: res != 0)
VerifyDecodeSuccess(res.attributes)

@ classmethod
Expand Down
66 changes: 53 additions & 13 deletions src/controller/python/test/test_scripts/mobile-device-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,59 @@ def do_tests(controller_nodeid, device_nodeid, address, timeout, discriminator,


@click.command()
@click.option("--controller-nodeid", default=TEST_CONTROLLER_NODE_ID, type=int, help="NodeId of the controller.")
@click.option("--device-nodeid", default=TEST_DEVICE_NODE_ID, type=int, help="NodeId of the device.")
@click.option("--address", "-a", default='', type=str, help="Skip commissionee discovery, commission the device with the IP directly.")
@click.option("--timeout", "-t", default=240, type=int, help="The program will return with timeout after specified seconds.")
@click.option("--discriminator", default=TEST_DISCRIMINATOR, type=int, help="Discriminator of the device.")
@click.option("--setup-pin", default=TEST_SETUPPIN, type=int, help="Setup pincode of the device.")
@click.option('--enable-test', default=['all'], type=str, multiple=True, help='The tests to be executed. By default, all tests will be executed, use this option to run a specific set of tests. Use --print-test-list for a list of appliable tests.')
@click.option('--disable-test', default=[], type=str, multiple=True, help='The tests to be excluded from the set of enabled tests. Use --print-test-list for a list of appliable tests.')
@click.option('--log-level', default='WARN', type=click.Choice(['ERROR', 'WARN', 'INFO', 'DEBUG']), help="The log level of the test.")
@click.option('--log-format', default=None, type=str, help="Override logging format")
@click.option('--print-test-list', is_flag=True, help="Print a list of test cases and test sets that can be toggled via --enable-test and --disable-test, then exit")
@click.option('--paa-trust-store-path', default='', type=str, help="Path that contains valid and trusted PAA Root Certificates.")
def run(controller_nodeid, device_nodeid, address, timeout, discriminator, setup_pin, enable_test, disable_test, log_level, log_format, print_test_list, paa_trust_store_path):
@click.option("--controller-nodeid",
default=TEST_CONTROLLER_NODE_ID,
type=int,
help="NodeId of the controller.")
@click.option("--device-nodeid",
default=TEST_DEVICE_NODE_ID,
type=int,
help="NodeId of the device.")
@click.option("--address", "-a",
default='',
type=str,
help="Skip commissionee discovery, commission the device with the IP directly.")
@click.option("--timeout", "-t",
default=240,
type=int,
help="The program will return with timeout after specified seconds.")
@click.option("--discriminator",
default=TEST_DISCRIMINATOR,
type=int,
help="Discriminator of the device.")
@click.option("--setup-pin",
default=TEST_SETUPPIN,
type=int,
help="Setup pincode of the device.")
@click.option('--enable-test',
default=['all'],
type=str,
multiple=True,
help='The tests to be executed. By default, all tests will be executed, use this option to run a '
'specific set of tests. Use --print-test-list for a list of appliable tests.')
@click.option('--disable-test',
default=[],
type=str,
multiple=True,
help='The tests to be excluded from the set of enabled tests. Use --print-test-list for a list of '
'appliable tests.')
@click.option('--log-level',
default='WARN',
type=click.Choice(['ERROR', 'WARN', 'INFO', 'DEBUG']),
help="The log level of the test.")
@click.option('--log-format',
default=None,
type=str,
help="Override logging format")
@click.option('--print-test-list',
is_flag=True,
help="Print a list of test cases and test sets that can be toggled via --enable-test and --disable-test, then exit")
@click.option('--paa-trust-store-path',
default='',
type=str,
help="Path that contains valid and trusted PAA Root Certificates.")
def run(controller_nodeid, device_nodeid, address, timeout, discriminator, setup_pin, enable_test, disable_test, log_level,
log_format, print_test_list, paa_trust_store_path):
coloredlogs.install(level=log_level, fmt=log_format, logger=logger)

if print_test_list:
Expand Down
Loading

0 comments on commit e7f7d5a

Please sign in to comment.