Skip to content

Commit 7c3a827

Browse files
committed
working on MH protocol impl (message types)
1 parent a33af0d commit 7c3a827

File tree

6 files changed

+265
-35
lines changed

6 files changed

+265
-35
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
"hidf",
126126
"hlen",
127127
"hlmt",
128+
"hoti",
128129
"hsid",
129130
"htet",
130131
"httpv",

pcapkit/protocols/data/internet/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@
196196
from pcapkit.protocols.data.internet.mh import CareofTestInitOption as MH_CareofTestInitOption
197197
from pcapkit.protocols.data.internet.mh import CareofTestOption as MH_CareofTestOption
198198
from pcapkit.protocols.data.internet.mh import UnknownMessage as MH_UnknownMessage
199+
from pcapkit.protocols.data.internet.mh import BindingRefreshRequestMessage as MH_BindingRefreshRequestMessage
200+
from pcapkit.protocols.data.internet.mh import HomeTestInitMessage as MH_HomeTestInitMessage
199201

200202
__all__ = [
201203
# Authentication Header

pcapkit/protocols/data/internet/mh.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363

6464
__all__ = [
6565
'MH',
66-
'UnknownMessage',
66+
'UnknownMessage', 'BindingRefreshRequestMessage',
6767

6868
'Option',
6969
'UnassignedOption', 'PadOption', 'BindRefreshAdviceOption', 'AlternateCareofAddressOption',
@@ -103,6 +103,36 @@ class UnknownMessage(MH):
103103
def __init__(self, next: 'TransType', length: 'int', type: 'Packet', chksum: 'bytes', data: 'bytes') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,redefined-builtin,line-too-long
104104

105105

106+
@info_final
107+
class BindingRefreshRequestMessage(MH):
108+
"""Data model for MH Binding Refresh Request (BRR) message type."""
109+
110+
#: Mobility options.
111+
options: 'OrderedMultiDict[Enum_Option, Option]'
112+
113+
if TYPE_CHECKING:
114+
def __init__(self, next: 'TransType', length: 'int', type: 'Packet', chksum: 'bytes',
115+
options: 'OrderedMultiDict[Enum_Option, Option]') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,redefined-builtin,line-too-long
116+
117+
118+
@info_final
119+
class HomeTestInitMessage(MH):
120+
"""Data modelf for MH Home Test Init (HoTI) message type."""
121+
122+
#: Home init cookie.
123+
cookie: 'bytes'
124+
#: Mobility options.
125+
options: 'OrderedMultiDict[Enum_Option, Option]'
126+
127+
if TYPE_CHECKING:
128+
def __init__(self, next: 'TransType', length: 'int', type: 'Packet', chksum: 'bytes',
129+
cookie: 'bytes', options: 'OrderedMultiDict[Enum_Option, Option]') -> 'None': ...
130+
131+
132+
133+
134+
135+
106136
class Option(Data):
107137
"""Data model for MH options."""
108138

pcapkit/protocols/internet/mh.py

Lines changed: 143 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@
136136
from pcapkit.utilities.exceptions import ProtocolError, UnsupportedCall
137137
from pcapkit.utilities.warnings import ProtocolWarning, warn
138138

139+
from pcapkit.protocols.schema.internet.mh import BindingRefreshRequestMessage as Schema_BindingRefreshRequestMessage
140+
from pcapkit.protocols.data.internet.mh import BindingRefreshRequestMessage as Data_BindingRefreshRequestMessage
141+
from pcapkit.protocols.schema.internet.mh import HomeTestInitMessage as Schema_HomeTestInitMessage
142+
from pcapkit.protocols.data.internet.mh import HomeTestInitMessage as Data_HomeTestInitMessage
143+
139144
if TYPE_CHECKING:
140145
from datetime import datetime as dt_type
141146
from enum import IntEnum as StdlibEnum
@@ -191,7 +196,8 @@ class MH(Internet[Data_MH, Schema_MH],
191196
__message__ = collections.defaultdict(
192197
lambda: 'unknown',
193198
{
194-
199+
Enum_Packet.Binding_Refresh_Request: 'brr',
200+
Enum_Packet.Home_Test_Init: 'hoti',
195201
},
196202
) # type: DefaultDict[Enum_Packet | int, str | tuple[PacketParser, PacketConstructor]]
197203

@@ -494,6 +500,82 @@ def _read_msg_unknown(self, schema: 'Schema_UnknownMessage', *,
494500
)
495501
return data
496502

503+
def _read_msg_brr(self, schema: 'Schema_BindingRefreshRequestMessage', *,
504+
header: 'Schema_MH') -> 'Data_BindingRefreshRequestMessage':
505+
"""Read MH binding refresh request (BRR) message type.
506+
507+
Structure of MH Binding Refresh Request Message [:rfc:`6275`]:
508+
509+
.. code-block:: text
510+
511+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
512+
| Reserved |
513+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
514+
| |
515+
. .
516+
. Mobility Options .
517+
. .
518+
| |
519+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
520+
521+
Args:
522+
schema: Parsed message type schema.
523+
header: Parsed MH header schema.
524+
525+
Returns:
526+
Parsed message type data.
527+
528+
"""
529+
data = Data_BindingRefreshRequestMessage(
530+
next=header.next,
531+
length=(header.length + 1) * 8,
532+
type=header.type,
533+
chksum=header.chksum,
534+
options=self._read_mh_options(schema.options)
535+
)
536+
return data
537+
538+
def _read_msg_hoti(self, schema: 'Schema_HomeTestInitMessage', *,
539+
header: 'Schema_MH') -> 'Data_HomeTestInitMessage':
540+
"""Read MH home test initiation (HoTI) message type.
541+
542+
Structure of MH Home Test Initiation Message [:rfc:`6275`]:
543+
544+
.. code-block:: text
545+
546+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
547+
| Reserved |
548+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
549+
| |
550+
+ Home Init Cookie +
551+
| |
552+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
553+
| |
554+
. .
555+
. Mobility Options .
556+
. .
557+
| |
558+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
559+
560+
Args:
561+
schema: Parsed message type schema.
562+
header: Parsed MH header schema.
563+
564+
Returns:
565+
Parsed message type data.
566+
567+
"""
568+
data = Data_HomeTestInitMessage(
569+
next=header.next,
570+
length=(header.length + 1) * 8,
571+
type=header.type,
572+
chksum=header.chksum,
573+
cookie=schema.cookie,
574+
options=self._read_mh_options(schema.options)
575+
)
576+
return data
577+
578+
497579

498580

499581

@@ -1158,6 +1240,8 @@ def _read_opt_ct(self, schema: 'Schema_CareofTestOption', *,
11581240

11591241

11601242

1243+
1244+
11611245
def _read_cga_extensions(self, extensions_schema: 'list[Schema_CGAExtension]') -> 'Extension':
11621246
"""Read CGA extensions.
11631247
@@ -1270,7 +1354,6 @@ def _read_ext_multiprefix(self, schema: 'Schema_MultiPrefixExtension', *,
12701354

12711355

12721356

1273-
12741357
def _make_msg_unknown(self, message: 'Optional[Data_UnknownMessage]', *,
12751358
data: 'bytes' = b'',
12761359
**kwargs: 'Any') -> 'Schema_UnknownMessage':
@@ -1292,30 +1375,80 @@ def _make_msg_unknown(self, message: 'Optional[Data_UnknownMessage]', *,
12921375
data=data,
12931376
)
12941377

1378+
def _make_msg_brr(self, message: 'Optional[Data_BindingRefreshRequestMessage]', *,
1379+
options: 'Optional[Option | list[Schema_Option | tuple[Enum_Option, dict[str, Any]] | bytes]]' = None,
1380+
**kwargs: 'Any') -> 'Schema_BindingRefreshRequestMessage':
1381+
"""Make MH binding refresh request (BRR) message type.
1382+
1383+
Args:
1384+
message: Message data model.
1385+
options: Mobility options.
1386+
**kwargs: Arbitrary keyword arguments.
1387+
1388+
Returns:
1389+
Constructed message type.
1390+
1391+
"""
1392+
if message is not None:
1393+
options = message.options
1394+
else:
1395+
options = options or []
1396+
1397+
return Schema_BindingRefreshRequestMessage(
1398+
options=self._make_mh_options(options),
1399+
)
1400+
1401+
def _make_msg_hoti(self, message: 'Optional[Data_HomeTestInitMessage]', *,
1402+
cookie: 'bytes' = b'\x00\x00\x00\x00\x00\x00\x00\x00',
1403+
options: 'Optional[Option | list[Schema_Option | tuple[Enum_Option, dict[str, Any]] | bytes]]' = None,
1404+
**kwargs: 'Any') -> 'Schema_HomeTestInitMessage':
1405+
"""Make MH home test init (HoTI) message type.
1406+
1407+
Args:
1408+
message: Message data model.
1409+
cookie: Home test cookie.
1410+
options: Mobility options.
1411+
**kwargs: Arbitrary keyword arguments.
1412+
1413+
Returns:
1414+
Constructed message type.
1415+
1416+
"""
1417+
if message is not None:
1418+
cookie = message.cookie
1419+
options = message.options
1420+
else:
1421+
options = options or []
1422+
1423+
return Schema_HomeTestInitMessage(
1424+
cookie=cookie,
1425+
options=self._make_mh_options(options),
1426+
)
1427+
12951428

12961429

1297-
def _make_mh_options(self, options: 'Option | list[Schema_Option | tuple[Enum_Option, dict[str, Any]] | bytes]') -> 'tuple[list[Schema_Option | bytes], int]':
1430+
1431+
1432+
1433+
def _make_mh_options(self, options: 'Option | list[Schema_Option | tuple[Enum_Option, dict[str, Any]] | bytes]') -> 'list[Schema_Option | bytes]':
12981434
"""Make options for MH.
12991435
13001436
Args:
13011437
options: MH options.
13021438
13031439
Returns:
1304-
Tuple of options and total length of options.
1440+
Mobility options list.
13051441
13061442
"""
1307-
total_length = 0
13081443
if isinstance(options, list):
13091444
options_list = [] # type: list[Schema_Option | bytes]
13101445
for schema in options:
13111446
if isinstance(schema, bytes):
13121447
code = Enum_Option.get(int.from_bytes(schema[0:1], 'big', signed=False))
13131448

13141449
data = schema # type: Schema_Option | bytes
1315-
data_len = len(data)
13161450
elif isinstance(schema, Schema):
13171451
data = schema
1318-
data_len = len(schema.pack())
13191452
else:
13201453
code, args = cast('tuple[Enum_Option, dict[str, Any]]', schema)
13211454
name = self.__option__[code]
@@ -1325,13 +1458,10 @@ def _make_mh_options(self, options: 'Option | list[Schema_Option | tuple[Enum_Op
13251458
getattr(self, meth_name, self._make_opt_none))
13261459
else:
13271460
meth = name[1]
1328-
13291461
data = meth(code, **args)
1330-
data_len = len(data.pack())
13311462

13321463
options_list.append(data)
1333-
total_length += data_len
1334-
return options_list, total_length
1464+
return options_list
13351465

13361466
options_list = []
13371467
for code, option in options.items(multi=True):
@@ -1344,11 +1474,8 @@ def _make_mh_options(self, options: 'Option | list[Schema_Option | tuple[Enum_Op
13441474
meth = name[1]
13451475

13461476
data = meth(code, option)
1347-
data_len = len(data.pack())
1348-
13491477
options_list.append(data)
1350-
total_length += data_len
1351-
return options_list, total_length
1478+
return options_list
13521479

13531480
def _make_opt_none(self, type: 'Enum_Option', option: 'Optional[Data_UnassignedOption]' = None, *,
13541481
data: 'bytes' = b'',
@@ -1853,6 +1980,7 @@ def _make_opt_ct(self, type: 'Enum_Option', option: 'Optional[Data_CareofTestOpt
18531980

18541981

18551982

1983+
18561984
def _make_cga_extensions(self, extensions: 'Extension | list[Schema_CGAExtension | tuple[Enum_CGAExtension, dict[str, Any]] | bytes]') -> 'tuple[list[Schema_CGAExtension | bytes], int]':
18571985
"""Make CGA extensions for MH.
18581986

pcapkit/protocols/schema/internet/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@
194194
from pcapkit.protocols.schema.internet.mh import CareofTestInitOption as MH_CareofTestInitOption
195195
from pcapkit.protocols.schema.internet.mh import CareofTestOption as MH_CareofTestOption
196196
from pcapkit.protocols.schema.internet.mh import UnknownMessage as MH_UnknownMessage
197+
from pcapkit.protocols.schema.internet.mh import BindingRefreshRequestMessage as MH_BindingRefreshRequestMessage
198+
from pcapkit.protocols.schema.internet.mh import HomeTestInitMessage as MH_HomeTestInitMessage
197199

198200
__all__ = [
199201
# Authentication Header

0 commit comments

Comments
 (0)