Skip to content

Commit

Permalink
[Python] Implement TLV List type (#25238)
Browse files Browse the repository at this point in the history
* [python] Implement TLV List type

* Update

* Revert "Update"

This reverts commit 448c46f.

* Fix

* Update
  • Loading branch information
erjiaqing authored and pull[bot] committed Jun 27, 2023
1 parent e86b869 commit 10a82f9
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 5 deletions.
1 change: 1 addition & 0 deletions src/controller/python/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ chip_python_wheel_action("chip-clusters") {
"chip/clusters/Types.py",
"chip/clusters/enum.py",
"chip/tlv/__init__.py",
"chip/tlv/tlvlist.py",
]
},
{
Expand Down
12 changes: 11 additions & 1 deletion src/controller/python/chip/tlv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from collections.abc import Mapping, Sequence
from enum import Enum

from .tlvlist import TLVList

TLV_TYPE_SIGNED_INTEGER = 0x00
TLV_TYPE_UNSIGNED_INTEGER = 0x04
TLV_TYPE_BOOLEAN = 0x08
Expand Down Expand Up @@ -223,6 +225,11 @@ def put(self, tag, val):
for containedTag, containedVal in val.items():
self.put(containedTag, containedVal)
self.endContainer()
elif isinstance(val, TLVList):
self.startPath(tag)
for containedTag, containedVal in val:
self.put(containedTag, containedVal)
self.endContainer()
elif isinstance(val, Sequence):
self.startArray(tag)
for containedVal in val:
Expand Down Expand Up @@ -576,7 +583,7 @@ def _decodeVal(self, tlv, decoding):
decoding["Array"] = []
self._get(tlv, decoding["Array"], decoding["value"])
elif decoding["type"] == "Path":
decoding["value"] = []
decoding["value"] = TLVList()
decoding["Path"] = []
self._get(tlv, decoding["Path"], decoding["value"])
elif decoding["type"] == "Null":
Expand Down Expand Up @@ -682,6 +689,9 @@ def _get(self, tlv, decodings, out):
if isinstance(out, Mapping):
tag = decoding["tag"] if decoding["tag"] is not None else "Any"
out[tag] = decoding["value"]
elif isinstance(out, TLVList):
tag = decoding["tag"] if decoding["tag"] is not None else None
out.append(tag, decoding["value"])
else:
out.append(decoding["value"])
else:
Expand Down
175 changes: 175 additions & 0 deletions src/controller/python/chip/tlv/tlvlist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#!/usr/bin/env python3
# coding=utf-8

#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import dataclasses
import enum
from typing import Any, Iterator, List, Tuple, Union


class TLVList:
"""Represents a list in CHIP TLV.
A TLVList can be constructed from a `list` of tuples of tag and value. `None` stands for "anonymous tag".
e.g.
```
l = TLVList([(1, 'a'), (2, 'b'), (None, 'c')])
```
Constructs a list of three items, tag 1 is 'a', tag 2 is 'b' and with an anonymous item 'c'.
Since TLVLists are ordered, it is meanful to iterate over an list:
e.g.
```
for tag, val in l:
print(f"tag={tag}, val={val}")
```
Outputs:
```
tag=1, val=a
tag=2, val=b
tag=None, val=c
```
One can also append items into an list:
e.g.
```
l.append(3, 'd')
```
The content of `l` will be `[(1, 'a'), (2, 'b'), (None, 'c'), (3, 'd')]`
One can access an item in the list via the tag.
e.g.
```
val = l[1]
# val is 'a'
```
It is also possible to get an item via the index since it is ordered:
e.g.
```
tag, val = l[TLVList.IndexMethod.Tag:2]
# tag is None, val is 'c'
```
"""

@dataclasses.dataclass
class TLVListItem:
tag: Union[None, int]
value: Any

def as_tuple(self):
return (self.tag, self.value)

def as_rich_repr_tuple(self):
if self.tag is None:
return "Anonymous", repr(self.value)
else:
return str(self.tag), repr(self.value)

def __repr__(self):
if self.tag is None:
return "Anonymous: " + repr(self.value)
else:
return str(self.tag) + ": " + repr(self.value)

def __rich_repr__(self):
yield self.as_rich_repr_tuple()

class IndexMethod(enum.Enum):
Index = 0
Tag = 1

class Iterator:
def __init__(self, iter: Iterator):
self._iterator = iter

def __iter__(self):
return self

def __next__(self):
res = next(self._iterator)
return res.tag, res.value

def __init__(self, items: List[Tuple[Union[int, None], Any]] = []):
"""Constructs a TLVList.
items: A list of tuples for the tag and value for the items in the TLVList.
"""
self._data: List[TLVList.TLVListItem] = []

for tag, val in items:
self.append(tag, val)

def _get_item_by_tag(self, tag) -> Any:
if not isinstance(tag, int):
raise ValueError("Tag should be a integer for non-anonymous fields.")
for data in self._data:
if data.tag == tag:
return data.value
raise KeyError(f"Tag {tag} not found in the list.")

def __getitem__(self, access) -> Any:
"""Gets a item in the list by the tag or the index.
Examples:
```
tlv_list[1] # returns the item in the list with tag `1`
tlv_list[TLVList.IndexMethod.Tag:2] # returns the item in the list with tag `2`
tlv_list[TLVList.IndexMethod.Index:0] # returns the tag and value of the first item in the list
```
"""
if isinstance(access, slice):
tag, index = access.start, access.stop
if tag == TLVList.IndexMethod.Tag:
return self._get_item_by_tag(index)
elif tag == TLVList.IndexMethod.Index:
return self._data[index].as_tuple()
raise ValueError("Method should be TLVList.IndexMethod.Tag or TLVList.IndexMethod.Index")
elif isinstance(access, int):
return self._get_item_by_tag(access)
raise ValueError("Invalid access method")

def append(self, tag: Union[None, int], value: Any) -> None:
"""Appends an item to the list."""
if (tag is not None) and (not isinstance(tag, int)):
raise KeyError(f"Tag should be a integer or none for anonymous tag, {type(tag)} got")
self._data.append(TLVList.TLVListItem(tag, value))

def __repr__(self):
return "TLVList" + repr(self._data)

def __rich_repr__(self):
for items in self._data:
yield items.as_rich_repr_tuple()

def __iter__(self) -> """TLVList.Iterator""":
return TLVList.Iterator(iter(self._data))

def __eq__(self, rhs: "TLVList") -> bool:
if not isinstance(rhs, TLVList):
return False
return self._data == rhs._data
62 changes: 58 additions & 4 deletions src/controller/python/test/unit_tests/test_tlv.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import unittest

from chip.tlv import TLVReader, TLVWriter
from chip.tlv import TLVList, TLVReader, TLVWriter
from chip.tlv import uint as tlvUint


Expand Down Expand Up @@ -112,6 +112,24 @@ def test_uint(self):
except Exception:
pass

def test_list(self):
encodeVal = self._getEncoded(TLVList([(None, 1), (None, 2), (1, 3)]))
self.assertEqual(encodeVal, bytearray([0b00010111, # List, anonymous tag
0x00, 0x01, # Anonymous tag, 1 octet signed int `1``
0x00, 0x02, # Anonymous tag, 1 octet signed int `2``
0b00100000, 0x01, 0x03, # Context specific tag `1`, 1 octet signed int `3`
0x18 # End of container
]))
encodeVal = self._getEncoded(TLVList([(None, 1), (None, TLVList([(None, 2), (3, 4)]))]))
self.assertEqual(encodeVal, bytearray([0b00010111, # List, anonymous tag
0x00, 0x01, # Anonymous tag, 1 octet signed int `1``
0b00010111, # List anonymous tag
0x00, 0x02, # Anonymous tag, 1 octet signed int `2``
0b00100000, 0x03, 0x04, # Context specific tag `1`, 1 octet signed int `3`
0x18, # End of inner list
0x18 # End of container
]))


class TestTLVReader(unittest.TestCase):
def _read_case(self, input, answer):
Expand Down Expand Up @@ -151,16 +169,52 @@ def test_structure(self):
test_cases = [
(b'\x15\x36\x01\x15\x35\x01\x26\x00\xBF\xA2\x55\x16\x37\x01\x24'
b'\x02\x00\x24\x03\x28\x24\x04\x00\x18\x24\x02\x01\x18\x18\x18\x18',
{1: [{1: {0: 374710975, 1: [0, 40, 0], 2: 1}}]}),
{1: [{1: {0: 374710975, 1: TLVList([(2, 0), (3, 40), (4, 0)]), 2: 1}}]}),
(b'\x156\x01\x155\x01&\x00\xBF\xA2U\x167\x01$\x02\x00$\x03($\x04\x01'
b'\x18,\x02\x18Nordic Semiconductor ASA\x18\x18\x18\x18',
{1: [{1: {0: 374710975, 1: [0, 40, 1], 2: 'Nordic Semiconductor ASA'}}]}),
{1: [{1: {0: 374710975, 1: TLVList([(2, 0), (3, 40), (4, 1)]), 2: 'Nordic Semiconductor ASA'}}]}),
(b"\0256\001\0255\001&\000\031\346x\2077\001$\002\001$\003\006$\004\000\030(\002\030\030\030\030",
{1: [{1: {0: 2272847385, 1: [1, 6, 0], 2: False}}]})
{1: [{1: {0: 2272847385, 1: TLVList([(2, 1), (3, 6), (4, 0)]), 2: False}}]})
]
for tlv_bytes, answer in test_cases:
self._read_case(tlv_bytes, answer)

def test_list(self):
self._read_case([0b00010111, # List, anonymous tag
0x00, 0x01, # Anonymous tag, 1 octet signed int `1``
0x00, 0x02, # Anonymous tag, 1 octet signed int `2``
0b00100000, 0x01, 0x03, # Context specific tag `1`, 1 octet signed int `3`
0x18 # End of container
], TLVList([(None, 1), (None, 2), (1, 3)]))
self._read_case([0b00010111, # List, anonymous tag
0x00, 0x01, # Anonymous tag, 1 octet signed int `1``
0b00010111, # List anonymous tag
0x00, 0x02, # Anonymous tag, 1 octet signed int `2``
0b00100000, 0x03, 0x04, # Context specific tag `1`, 1 octet signed int `3`
0x18, # End of inner list
0x18 # End of container
], TLVList([(None, 1), (None, TLVList([(None, 2), (3, 4)]))]))


class TestTLVTypes(unittest.TestCase):
def test_list(self):
var = TLVList([(None, 1), (None, 2), (1, 3)])
self.assertEqual(var[1], 3)
self.assertEqual(var[TLVList.IndexMethod.Index:0], (None, 1))
self.assertEqual(var[TLVList.IndexMethod.Tag:1], 3)

var.append(None, 4)
self.assertEqual(var, TLVList([(None, 1), (None, 2), (1, 3), (None, 4)]))

var.append(5, 6)
self.assertEqual(var, TLVList([(None, 1), (None, 2), (1, 3), (None, 4), (5, 6)]))

expectIterateContent = [(None, 1), (None, 2), (1, 3), (None, 4), (5, 6)]
iteratedContent = []
for tag, value in var:
iteratedContent.append((tag, value))
self.assertEqual(expectIterateContent, iteratedContent)


if __name__ == '__main__':
unittest.main()

0 comments on commit 10a82f9

Please sign in to comment.