133
133
from pcapkit .protocols .schema .internet .mh import CareofTestOption as Schema_CareofTestOption
134
134
from pcapkit .protocols .data .internet .mh import CareofTestOption as Data_CareofTestOption
135
135
136
+ from pcapkit .protocols .schema .internet .mh import UnknownMessage as Schema_UnknownMessage
137
+ from pcapkit .protocols .data .internet .mh import UnknownMessage as Data_UnknownMessage
138
+
139
+
136
140
if TYPE_CHECKING :
137
141
from datetime import datetime as dt_type
138
142
from enum import IntEnum as StdlibEnum
155
159
Extension = OrderedMultiDict [Enum_CGAExtension , Data_CGAExtension ]
156
160
157
161
PacketParser = Callable [[Schema_Packet , NamedArg (Schema_MH , 'header' )], Data_MH ]
158
- PacketConstructor = Callable [[Enum_Packet , DefaultArg (Optional [Data_MH ]),
162
+ PacketConstructor = Callable [[DefaultArg (Optional [Data_MH ]),
159
163
KwArg (Any )], Schema_Packet ]
160
164
161
165
OptionParser = Callable [[Schema_Option , NamedArg (Option , 'options' )], Data_Option ]
@@ -182,10 +186,10 @@ class MH(Internet[Data_MH, Schema_MH],
182
186
183
187
#: DefaultDict[Enum_Packet, str | tuple[PacketParser, PacketConstructor]]:
184
188
#: Message type to method mapping. Method names are expected to be referred
185
- #: to the class by ``_read_packet_ ${name}`` and/or ``_make_packet_ ${name}``,
189
+ #: to the class by ``_read_msg_ ${name}`` and/or ``_make_msg_ ${name}``,
186
190
#: and if such name not found, the value should then be a method that can
187
- #: parse the packet by itself.
188
- __packet__ = collections .defaultdict (
191
+ #: parse the message type by itself.
192
+ __message__ = collections .defaultdict (
189
193
lambda : 'unknown' ,
190
194
{
191
195
@@ -320,13 +324,14 @@ def read(self, length: 'Optional[int]' = None, *, version: 'Literal[4, 6]' = 4,
320
324
length = len (self )
321
325
schema = self .__header__
322
326
323
- mh = Data_MH (
324
- next = schema .next ,
325
- length = (schema .length + 1 ) * 8 ,
326
- type = schema .type ,
327
- chksum = schema .chksum ,
328
- data = schema .data ,
329
- )
327
+ name = self .__message__ [schema .type ]
328
+ if isinstance (name , str ):
329
+ meth_name = f'_read_msg_{ name } '
330
+ meth = cast ('PacketParser' ,
331
+ getattr (self , meth_name , self ._read_msg_unknown ))
332
+ else :
333
+ meth = name [0 ]
334
+ mh = meth (schema .data , header = schema )
330
335
331
336
if extension :
332
337
return mh
@@ -342,12 +347,23 @@ def make(self,
342
347
type_namespace : 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None , # pylint: disable=line-too-long
343
348
type_reversed : 'bool' = False ,
344
349
chksum : 'bytes' = b'' ,
345
- data : 'bytes' = b'\x00 \x00 ' , # minimum length
350
+ data : 'bytes | Data_MH | Schema_Packet | dict[str, Any] ' = b'\x00 \x00 ' , # minimum length
346
351
payload : 'Protocol | Schema | bytes' = b'' ,
347
352
** kwargs : 'Any' ) -> 'Schema_MH' :
348
353
"""Make (construct) packet data.
349
354
350
355
Args:
356
+ next: Next header type.
357
+ next_default: Default value for next header type field.
358
+ next_namespace: Namespace of next header type field.
359
+ next_reversed: Whether the bits of next header type field is reversed.
360
+ type: Mobility Header type.
361
+ type_default: Default value for Mobility Header type field.
362
+ type_namespace: Namespace of Mobility Header type field.
363
+ type_reversed: Whether the bits of Mobility Header type field is reversed.
364
+ chksum: Checksum.
365
+ data: Message data.
366
+ payload: Payload of next layer protocol.
351
367
**kwargs: Arbitrary keyword arguments.
352
368
353
369
Returns:
@@ -359,12 +375,32 @@ def make(self,
359
375
type_val = self ._make_index (type , type_default , namespace = type_namespace , # type: ignore[call-overload]
360
376
reversed = type_reversed , pack = False )
361
377
378
+ if isinstance (data , bytes ):
379
+ data_val = data # type: bytes | Schema_Packet
380
+ elif isinstance (data , (dict , Data_MH )):
381
+ name = self .__message__ [type_val ]
382
+ if isinstance (name , str ):
383
+ meth_name = f'_make_msg_{ name } '
384
+ meth = cast ('PacketConstructor' ,
385
+ getattr (self , meth_name , self ._make_msg_unknown ))
386
+ else :
387
+ meth = name [1 ]
388
+
389
+ if isinstance (data , dict ):
390
+ data_val = meth (** data )
391
+ else :
392
+ data_val = meth (data )
393
+ elif isinstance (data , Schema_Packet ):
394
+ data_val = data
395
+ else :
396
+ raise ProtocolError (f'MH: [Type { type_val } ] invalid format' )
397
+
362
398
return Schema_MH (
363
399
next = next_val ,
364
- length = math .ceil ((len (data ) + 6 ) / 8 ) - 1 ,
400
+ length = math .ceil ((len (data_val ) + 6 ) / 8 ) - 1 ,
365
401
type = type_val ,
366
402
chksum = chksum ,
367
- data = data ,
403
+ data = data_val ,
368
404
payload = payload ,
369
405
)
370
406
@@ -434,10 +470,34 @@ def _make_data(cls, data: 'Data_MH') -> 'dict[str, Any]': # type: ignore[overri
434
470
'next' : data .next ,
435
471
'type' : data .type ,
436
472
'chksum' : data .chksum ,
437
- 'data' : data . data ,
473
+ 'data' : data ,
438
474
'payload' : cls ._make_payload (data ),
439
475
}
440
476
477
+ def _read_msg_unknown (self , schema : 'Schema_UnknownMessage' , * ,
478
+ header : 'Schema_MH' ) -> 'Data_UnknownMessage' :
479
+ """Read unknown MH message type.
480
+
481
+ Args:
482
+ schema: Parsed message type schema.
483
+ header: Parsed MH header schema.
484
+
485
+ Returns:
486
+ Parsed message type data.
487
+
488
+ """
489
+ data = Data_UnknownMessage (
490
+ next = header .next ,
491
+ length = (header .length + 1 ) * 8 ,
492
+ type = header .type ,
493
+ chksum = header .chksum ,
494
+ data = schema .data ,
495
+ )
496
+ return data
497
+
498
+
499
+
500
+
441
501
def _read_mh_options (self , options_schema : 'list[Schema_Option]' ) -> 'Option' :
442
502
"""Read MH options.
443
503
@@ -1210,6 +1270,30 @@ def _read_ext_multiprefix(self, schema: 'Schema_MultiPrefixExtension', *,
1210
1270
1211
1271
1212
1272
1273
+
1274
+ def _make_msg_unknown (self , message : 'Optional[Data_UnknownMessage]' , * ,
1275
+ data : 'bytes' = b'' ,
1276
+ ** kwargs : 'Any' ) -> 'Schema_UnknownMessage' :
1277
+ """Make MH unknown message type.
1278
+
1279
+ Args:
1280
+ message: Message data model.
1281
+ data: Raw message data.
1282
+ **kwargs: Arbitrary keyword arguments.
1283
+
1284
+ Returns:
1285
+ Constructed message type.
1286
+
1287
+ """
1288
+ if message is not None :
1289
+ data = message .data
1290
+
1291
+ return Schema_UnknownMessage (
1292
+ data = data ,
1293
+ )
1294
+
1295
+
1296
+
1213
1297
def _make_mh_options (self , options : 'Option | list[Schema_Option | tuple[Enum_Option, dict[str, Any]] | bytes]' ) -> 'tuple[list[Schema_Option | bytes], int]' :
1214
1298
"""Make options for MH.
1215
1299
0 commit comments