-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathtest_udp_message.py
69 lines (56 loc) · 5.49 KB
/
test_udp_message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import pytest
from dlms_cosem.protocol.wrappers import DlmsUdpMessage, WrapperHeader
data_examples_encrypted_data_nofication = [
b"\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19\"\x91\x99\x16A\x03;0\x00\x00\x01\xe5\x02\\\xe9\xd2'\x1f\xd7\x8b\xe8\xc2\x04!\x1a\x91j\x9d\x7fX~\nz\x81L\xad\xea\x89\xe9Y?\x01\xf9.\xa8\xc0\x87\xb5\xbd\xfd\xef\xea\xb6\xbe\xcf(-\xfeI\xc0\x8f[\xe6\xdc\x84\x00",
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xe6\x03\xd4\xd3W{\x7fd\x994\xe3\xb7\xc7\x19\xa3\xde5\x1a\xb2\x8cz\xc7\xb8\xa1\xe4D\xb8\x96\x91\xe9%\x91\xce\x1e\xb2\x82}\xf97\xa2\xe5@(\x0fb\x11\xf4\x93d\x80/\xa0\xf5\xc4\x13',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xe7\x1c+\xbc?\xfb\x9aN9x\xf2k\xfa\xf5\xe9A\xe2i\xa2\xb6\x1dG\xb46\x1b/[\x1d"\xf5\xa0N\xffp\x8c\x9f\xfbI<@\x16:\x0e\x19x\xb7D\x9c\xec\x9c\xca\xe0\x8d\x19D',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xe8\xb1\xf9[\xdd.\xdbA\xd3V\xdbW\xeeQ, \xc6\xeace:U\xbb\x18q~A\x9fE\xe8\xd3\xb4\xf3C)\xf4\xce\xb2\x1c\x81A\xa7\xe3\xcc\x00\xf0k~-\x98\xd7j\xf4\xb8\x06',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xe9\xfd\x1c&\xa0\xa1\xa8\x8b\x86\xf3\xdc \x10\xb1{\xeb\xa3h\xa3\xb6\xd2\xad\x96SZ\xd4\x1f\x84\xd6\xcbi\xa86]\xb4\x1b\x8c\xac\xb5D\x94v\xc3\xf4 \xe1\x86\xffk\x1b`E\x11p\x08',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xea3\x80\xbdH\x91\x00v\x18]\xa7|\xf9\xd0\xf5v\xc4{\n\xc0\x98\xef\xb3~\xb7u\x89\x8e\x9c\xcde\x02\x13\xa7?&\x9f\x8c{\xea8N\xd3\x88\xe7\xcc\xd2\x05\x06\xfe7;\x06\x8b:',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xeb0J\xf3\x911\xd5\xa6J\x06\xb2\xbb\xa8\xf1\xb9]\xd2+\xfd\xa4]9\xad\xcb\x08\x89\xe3\x03s4\x0f7\xc5\x80\xd3"f\x89>\xc7\'\xae.\xef\xe2\xd1Z8\x89\xab\xd1\x85\x94\x005',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xecho\xf7\xf6\xd0\x9a\x96+\xe5:\xcc\x95\xe1\xe4\xc6\xfeO\xb1[\xfd\xa2\x93\xe2\xae\xcd\x85]\x7f\xaa\xc7\x99\x8cXQ\xce\x038f`E\xa6\xcf\x87\x924V\xf8\xb1+\x02\xb6.\xfc\xed',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xee\xf4\x86`\x0f\xf8\xcf\x8dMA!\xe1B>Q\r\x9c\x87)\xf4\x8b!b\x85t\xfe\x16\xd9\xcbT\x06sL\xefW\x14H\x7f\xf6#\x10\xa4?\x1av\x00L\xa5`\x1b\xbf>\xf9c\x9f',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xefA&8\xb9C\xa0\xfe\xc2,\x8d\x02\xb4\xc4\xb7}\x9es\x8d\x98\xe3q\t\xdb\x85\x12\\\x14\x9f\xa9\xdf=I\xe3\t\xf9\xc3\xa5\xb3\x81\x0b5\xed\x9fVx\xb4\xc7\x81y.\xb8>n+',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xf0N@\xc8{\xde\xb0\xc12\xbfI"\xdf\xc2\x98\xae~pt\xf3\xec_\x1e\x0f\x93\xf36\xfd\x84\xa2\xdf\xb2\xbc\x0b\xed\x80\x84\xf4\xf2\xcf\xebzf\xb1\x16\xd2E\xc8\xb1k\x93\xefM\x1f\x88',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xf1\xce\xef\x1e-\xb6ad\x9a\xbc?\xc4\x1by+\x9a\xd5\xa9\xf0 J\xa16{i\xd5\xdc\x18\x0f\x8c\xd8\xaf\x8d\x99%\x9d\x1d\xfa\x16[\xaa\tg\xb1\xcej\xb9\x8a\xf8\xa5\xdb\x94(\xd3G',
b'\x00\x01\x00\x01\x00\x01\x00F\xdb\x08/\x19"\x91\x99\x16A\x03;0\x00\x00\x01\xf2\xeb\xae\xa2s\xd5.\xd6V\xc0\x97wM\x08=G%]\x88b\xb57\x1d\xc0l\xf1 \xdcU\x81z;\x91\xc3\x86\xac/g\xca\xf7\x94\x1a=\x01\xb2\xb6|\xdd\x9d{\xbb\x871\x12K',
]
def test_udp_message_from_bytes():
udp_header_data = b"\x00\x01\x00\x01\x00\x01\x00F"
dlms_data = b"\xdb\x08/\x19\"\x91\x99\x16A\x03;0\x00\x00\x01\xe5\x02\\\xe9\xd2'\x1f\xd7\x8b\xe8\xc2\x04!\x1a\x91j\x9d\x7fX~\nz\x81L\xad\xea\x89\xe9Y?\x01\xf9.\xa8\xc0\x87\xb5\xbd\xfd\xef\xea\xb6\xbe\xcf(-\xfeI\xc0\x8f[\xe6\xdc\x84\x00"
udp_data = udp_header_data + dlms_data
message = DlmsUdpMessage.from_bytes(in_data=udp_data)
assert message.wrapper_header.version == 1
assert message.wrapper_header.source_wport == 1
assert message.wrapper_header.destination_wport == 1
assert message.wrapper_header.version == 1
assert message.wrapper_header.length == 70
assert message.data == dlms_data
def test_upd_message_to_bytes():
udp_header_data = b"\x00\x01\x00\x01\x00\x01\x00F"
dlms_data = b"\xdb\x08/\x19\"\x91\x99\x16A\x03;0\x00\x00\x01\xe5\x02\\\xe9\xd2'\x1f\xd7\x8b\xe8\xc2\x04!\x1a\x91j\x9d\x7fX~\nz\x81L\xad\xea\x89\xe9Y?\x01\xf9.\xa8\xc0\x87\xb5\xbd\xfd\xef\xea\xb6\xbe\xcf(-\xfeI\xc0\x8f[\xe6\xdc\x84\x00"
udp_data = udp_header_data + dlms_data
wrapper_header = WrapperHeader(
source_wport=1, destination_wport=1, length=len(dlms_data)
)
message = DlmsUdpMessage(data=dlms_data, wrapper_header=wrapper_header)
assert message.wrapper_header.version == 1
assert message.wrapper_header.source_wport == 1
assert message.wrapper_header.destination_wport == 1
assert message.wrapper_header.version == 1
assert message.wrapper_header.length == 70
assert message.data == dlms_data
assert message.to_bytes() == udp_data
# def test_udp_parsing():
# udp = UDPRequest(data_examples_encrypted_data_nofication[0])
#
# assert (udp) == 2
#
# # it is a general global ciphering APDU
#
# a = (b'\x00\x01\x00\x01\x00\x01\x00F' # UDP wrapper
# b'\xdb' # general global ciphering tag
# b'\x08/\x19"\x91\x99\x16A\x03' # system title
# b';0\x00\x00\x01\xf2' # security Control field = 0b00110000 No compression, unicast, encrypted authenticated. length = 59 bytes = OK
# b'\xeb\xae\xa2s\xd5.\xd6V\xc0\x97wM\x08=G%]\x88b\xb57\x1d\xc0l\xf1 \xdcU\x81z;\x91\xc3\x86\xac/g\xca\xf7\x94\x1a='
# b'\x01\xb2\xb6|\xdd\x9d{\xbb\x871\x12K') # auth tag)