From 267974882d1a065e99d1fb00c68956cdef98b63c Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Thu, 8 Jun 2023 22:28:52 +0200 Subject: [PATCH] [python] update cached attributes incrementally (#26774) * [python] update cached attributes incrementally Instead of rebuilding the cache from scratch on every report, just update the Endpoint/Cluster/Attributes which actually changed. Obviously, this is significantly faster, especially for small updates. * Update src/controller/python/chip/clusters/Attribute.py Co-authored-by: Marcel van der Veldt * Address review feedback * Address one more unnecessary if bracket * Add attribute cache tests using integration tests * Fix lintinig issues * Apply same report interval * Increase timeout for mobile-device-test.py --------- Co-authored-by: Marcel van der Veldt --- .../python/chip/clusters/Attribute.py | 106 +++++++++--------- .../test/test_scripts/cluster_objects.py | 81 +++++++++++++ .../linux-cirque/MobileDeviceTest.py | 2 +- 3 files changed, 137 insertions(+), 52 deletions(-) diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index 13a529d14316ad..4eab3700fac7c5 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -383,7 +383,7 @@ def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, V clusterCache[path.AttributeId] = data - def UpdateCachedData(self): + def UpdateCachedData(self, changedPathSet: set[AttributePath]): ''' This converts the raw TLV data into a cluster object format. Two formats are available: @@ -401,68 +401,72 @@ def UpdateCachedData(self): tlvCache = self.attributeTLVCache attributeCache = self.attributeCache - for endpoint in tlvCache: - if (endpoint not in attributeCache): - attributeCache[endpoint] = {} + for attributePath in changedPathSet: + endpointId = attributePath.EndpointId - endpointCache = attributeCache[endpoint] + if endpointId not in attributeCache: + attributeCache[endpointId] = {} - for cluster in tlvCache[endpoint]: - if cluster not in _ClusterIndex: + endpointCache = attributeCache[endpointId] + + clusterId = attributePath.ClusterId + + if clusterId not in _ClusterIndex: + # + # #22599 tracks dealing with unknown clusters more + # gracefully so that clients can still access this data. + # + continue + + clusterType = _ClusterIndex[clusterId] + + if clusterType not in endpointCache: + endpointCache[clusterType] = {} + + clusterCache = endpointCache[clusterType] + clusterDataVersion = self.versionList.get( + endpointId, {}).get(clusterId, None) + + if self.returnClusterObject: + try: + # Since the TLV data is already organized by attribute tags, we can trivially convert to a cluster object representation. + endpointCache[clusterType] = clusterType.FromDict( + data=clusterType.descriptor.TagDictToLabelDict([], tlvCache[endpointId][clusterId])) + endpointCache[clusterType].SetDataVersion( + clusterDataVersion) + except Exception as ex: + decodedValue = ValueDecodeFailure( + tlvCache[endpointId][clusterId], ex) + endpointCache[clusterType] = decodedValue + else: + clusterCache[DataVersion] = clusterDataVersion + + attributeId = attributePath.AttributeId + + value = tlvCache[endpointId][clusterId][attributeId] + + if (clusterId, attributeId) not in _AttributeIndex: # # #22599 tracks dealing with unknown clusters more # gracefully so that clients can still access this data. # continue - clusterType = _ClusterIndex[cluster] - - if (clusterType not in endpointCache): - endpointCache[clusterType] = {} + attributeType = _AttributeIndex[(clusterId, attributeId)][0] - clusterCache = endpointCache[clusterType] - clusterDataVersion = self.versionList.get( - endpoint, {}).get(cluster, None) + if attributeType not in clusterCache: + clusterCache[attributeType] = {} - if (self.returnClusterObject): + if isinstance(value, ValueDecodeFailure): + clusterCache[attributeType] = value + else: try: - # Since the TLV data is already organized by attribute tags, we can trivially convert to a cluster object representation. - endpointCache[clusterType] = clusterType.FromDict( - data=clusterType.descriptor.TagDictToLabelDict([], tlvCache[endpoint][cluster])) - endpointCache[clusterType].SetDataVersion( - clusterDataVersion) + decodedValue = attributeType.FromTagDictOrRawValue( + tlvCache[endpointId][clusterId][attributeId]) except Exception as ex: - decodedValue = ValueDecodeFailure( - tlvCache[endpoint][cluster], ex) - endpointCache[clusterType] = decodedValue - else: - clusterCache[DataVersion] = clusterDataVersion - for attribute in tlvCache[endpoint][cluster]: - value = tlvCache[endpoint][cluster][attribute] - - if (cluster, attribute) not in _AttributeIndex: - # - # #22599 tracks dealing with unknown clusters more - # gracefully so that clients can still access this data. - # - continue - - attributeType = _AttributeIndex[( - cluster, attribute)][0] - - if (attributeType not in clusterCache): - clusterCache[attributeType] = {} - - if (type(value) is ValueDecodeFailure): - clusterCache[attributeType] = value - else: - try: - decodedValue = attributeType.FromTagDictOrRawValue( - tlvCache[endpoint][cluster][attribute]) - except Exception as ex: - decodedValue = ValueDecodeFailure(value, ex) + decodedValue = ValueDecodeFailure(value, ex) - clusterCache[attributeType] = decodedValue + clusterCache[attributeType] = decodedValue class SubscriptionTransaction: @@ -766,7 +770,7 @@ def _handleReportBegin(self): pass def _handleReportEnd(self): - self._cache.UpdateCachedData() + self._cache.UpdateCachedData(self._changedPathSet) if (self._subscription_handler is not None): for change in self._changedPathSet: diff --git a/src/controller/python/test/test_scripts/cluster_objects.py b/src/controller/python/test/test_scripts/cluster_objects.py index 80210bb8af32d8..53516c1dc75e9b 100644 --- a/src/controller/python/test/test_scripts/cluster_objects.py +++ b/src/controller/python/test/test_scripts/cluster_objects.py @@ -200,6 +200,85 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction): sub.Shutdown() + @ classmethod + @ base.test_case + async def TestAttributeCacheAttributeView(cls, devCtrl): + logger.info("Test AttributeCache Attribute-View") + sub: SubscriptionTransaction = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=[(1, Clusters.OnOff.Attributes.OnOff)], returnClusterObject=False, reportInterval=(3, 10)) + + event = asyncio.Event() + + def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction): + event.set() + + sub.SetAttributeUpdateCallback(subUpdate) + + try: + data = sub.GetAttributes() + req = Clusters.OnOff.Commands.On() + await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req) + + await asyncio.wait_for(event.wait(), timeout=11) + + if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 1): + raise ValueError("Current On/Off state should be 1") + + event.clear() + + req = Clusters.OnOff.Commands.Off() + await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req) + + await asyncio.wait_for(event.wait(), timeout=11) + + if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 0): + raise ValueError("Current On/Off state should be 0") + + except TimeoutError: + raise AssertionError("Did not receive updated attribute") + finally: + sub.Shutdown() + + @ classmethod + @ base.test_case + async def TestAttributeCacheClusterView(cls, devCtrl): + logger.info("Test AttributeCache Cluster-View") + sub: SubscriptionTransaction = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=[(1, Clusters.OnOff.Attributes.OnOff)], returnClusterObject=True, reportInterval=(3, 10)) + + event = asyncio.Event() + + def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction): + event.set() + + sub.SetAttributeUpdateCallback(subUpdate) + + try: + data = sub.GetAttributes() + + req = Clusters.OnOff.Commands.On() + await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req) + + await asyncio.wait_for(event.wait(), timeout=11) + + cluster: Clusters.OnOff = data[1][Clusters.OnOff] + if (not cluster.onOff): + raise ValueError("Current On/Off state should be True") + + event.clear() + + req = Clusters.OnOff.Commands.Off() + await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req) + + await asyncio.wait_for(event.wait(), timeout=11) + + cluster: Clusters.OnOff = data[1][Clusters.OnOff] + if (cluster.onOff): + raise ValueError("Current On/Off state should be False") + + except TimeoutError: + raise AssertionError("Did not receive updated attribute") + finally: + sub.Shutdown() + @ classmethod @ base.test_case async def TestSubscribeZeroMinInterval(cls, devCtrl): @@ -638,6 +717,8 @@ async def RunTest(cls, devCtrl): await cls.TestReadAttributeRequests(devCtrl) await cls.TestSubscribeZeroMinInterval(devCtrl) await cls.TestSubscribeAttribute(devCtrl) + await cls.TestAttributeCacheAttributeView(devCtrl) + await cls.TestAttributeCacheClusterView(devCtrl) await cls.TestMixedReadAttributeAndEvents(devCtrl) # Note: Write will change some attribute values, always put it after read tests await cls.TestWriteRequest(devCtrl) diff --git a/src/test_driver/linux-cirque/MobileDeviceTest.py b/src/test_driver/linux-cirque/MobileDeviceTest.py index 84ab53b9c15514..641b32d14ff544 100755 --- a/src/test_driver/linux-cirque/MobileDeviceTest.py +++ b/src/test_driver/linux-cirque/MobileDeviceTest.py @@ -99,7 +99,7 @@ def run_controller_test(self): CHIP_REPO, "out/debug/linux_x64_gcc/controller/python/chip_repl-0.0-py3-none-any.whl"))) command = ("gdb -batch -return-child-result -q -ex run -ex \"thread apply all bt\" " - "--args python3 {} -t 240 -a {} --paa-trust-store-path {}").format( + "--args python3 {} -t 300 -a {} --paa-trust-store-path {}").format( os.path.join( CHIP_REPO, "src/controller/python/test/test_scripts/mobile-device-test.py"), ethernet_ip, os.path.join(CHIP_REPO, MATTER_DEVELOPMENT_PAA_ROOT_CERTS))