Skip to content

Commit 5456696

Browse files
committed
revised schema's packet context usage
* added packet param to SchemaField * use packet arg in SchemaField.un/pack as __packet__ dict key * added docs for OptionField's __option_padding__ dict key * added snaplen for PCAPNG engine extraction process (for ISB) * revised usage of callback functions in schemas * bugfix in PCAPNG schema for PayloadField usage
1 parent e8e715b commit 5456696

File tree

13 files changed

+172
-63
lines changed

13 files changed

+172
-63
lines changed

pcapkit/corekit/fields/collections.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,12 @@ def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> 'list
227227
Returns:
228228
Unpacked field value.
229229
230+
Important:
231+
If the option list ended before the specified size limit,
232+
inject ``__option_padding__`` as the remaining length to
233+
the ``packet`` argument such that the next fields can be
234+
aware of such informations.
235+
230236
"""
231237
length = self.length
232238
if isinstance(buffer, bytes):

pcapkit/corekit/fields/misc.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ class SchemaField(_Field[_TS]):
462462
an integer value and accept the current packet as its only argument.
463463
schema: Field schema.
464464
default: Default value for field.
465+
packet: Optional packet data for unpacking and/or packing purposes.
465466
callback: Callback function to process field value, which should accept
466467
the current field and the current packet as its arguments.
467468
@@ -480,10 +481,15 @@ def schema(self) -> 'Type[_TS]':
480481
def __init__(self, length: 'int | Callable[[dict[str, Any]], int]' = lambda _: -1,
481482
schema: 'Optional[Type[_TS]]' = None,
482483
default: '_TS | NoValueType | bytes' = NoValue,
484+
packet: 'Optional[dict[str, Any]]' = None,
483485
callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None':
484486
self._name = '<schema>'
485487
self._callback = callback
486488

489+
if packet is None:
490+
packet = {}
491+
self._packet = packet
492+
487493
if schema is None:
488494
raise FieldError('Schema field must have a schema.')
489495
self._schema = schema
@@ -529,6 +535,11 @@ def pack(self, value: 'Optional[_TS | bytes]', packet: 'dict[str, Any]') -> 'byt
529535
Returns:
530536
Packed field value.
531537
538+
Notes:
539+
We will use ``packet`` as a ``__packet__`` key in the packet context
540+
passed to the underlying :class:`~pcapkit.protocols.schema.schema.Schema`
541+
for packing purposes.
542+
532543
"""
533544
if value is None:
534545
if self._default is NoValue:
@@ -537,7 +548,11 @@ def pack(self, value: 'Optional[_TS | bytes]', packet: 'dict[str, Any]') -> 'byt
537548

538549
if isinstance(value, bytes):
539550
return value
540-
return value.pack(packet)
551+
552+
packet.update(self._packet)
553+
return value.pack({
554+
'__packet__': packet,
555+
})
541556

542557
def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TS':
543558
"""Unpack field value from :obj:`bytes`.
@@ -549,12 +564,21 @@ def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TS'
549564
Returns:
550565
Unpacked field value.
551566
567+
Notes:
568+
We will use ``packet`` as a ``__packet__`` key in the packet context
569+
passed to the underlying :class:`~pcapkit.protocols.schema.schema.Schema`
570+
for unpacking purposes.
571+
552572
"""
553573
if isinstance(buffer, bytes):
554574
file = io.BytesIO(buffer) # type: IO[bytes]
555575
else:
556576
file = buffer
557-
return cast('_TS', self._schema.unpack(file, self.length, packet)) # type: ignore[call-arg,misc]
577+
578+
packet.update(self._packet)
579+
return cast('_TS', self._schema.unpack(file, self.length, { # type: ignore[call-arg,misc]
580+
'__packet__': packet,
581+
}))
558582

559583

560584
class ForwardMatchField(_Field[_TC]):

pcapkit/foundation/engines/pcapng.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ def read_frame(self) -> 'P_PCAPNG':
156156
while True:
157157
# read next block
158158
block = P_PCAPNG(ext._ifile, num=ext._frnum+1, ctx=self._ctx,
159-
layer=ext._exlyr, protocol=ext._exptl)
159+
layer=ext._exlyr, protocol=ext._exptl,
160+
__packet__={
161+
'snaplen': self._get_snaplen(),
162+
})
160163

161164
# check block type
162165
if block.info.type == Enum_BlockType.Section_Header_Block:
@@ -277,3 +280,17 @@ def _write_file(self, block: 'Data_PCAPNG', *, name: 'str') -> 'None':
277280
ext._ofile(self._ctx.to_dict(), name=name)
278281
ofile = ext._ofile
279282
ext._offmt = ofile.kind
283+
284+
def _get_snaplen(self) -> 'int':
285+
"""Get snapshot length from the current context.
286+
287+
This method is used for providing the snapshot length to the ``__packet__``
288+
argument when parsing a Simple Packet Block (SPB).
289+
290+
Notes:
291+
If there is no interface, return ``0xFFFFFFFFFFFFFFFF``.
292+
293+
"""
294+
if self._ctx.interfaces:
295+
return self._ctx.interfaces[0].snaplen
296+
return 0xFFFFFFFFFFFFFFFF

pcapkit/protocols/schema/internet/hip.py

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -458,34 +458,36 @@ def pre_unpack(cls, packet: 'dict[str, Any]') -> 'None':
458458
packet: packet data
459459
460460
"""
461-
if 'options' not in packet:
462-
return
463-
464-
cipher_list = cast('list[Data_HIPCipherParameter]',
465-
packet['options'].getlist(Enum_Parameter.HIP_CIPHER))
466-
if cipher_list:
467-
warn(f'HIP: [ParamNo {Enum_Parameter.ENCRYPTED}] '
468-
'missing HIP_CIPHER parameter', ProtocolWarning)
469-
# raise ProtocolError(f'HIPv{version}: [ParamNo {schema.type}] invalid format')
470-
471-
cipher_id = Enum_Cipher(0xffff)
472-
else:
473-
cipher_ids = [] # type: list[Enum_Cipher]
474-
for cipher in cipher_list:
475-
cipher_ids.extend(cipher.cipher_id)
476-
477-
encrypted_list = cast('list[Data_EncryptedParameter]',
478-
packet['options'].getlist(Enum_Parameter.ENCRYPTED))
479-
encrypted_index = len(encrypted_list)
480-
481-
if encrypted_index >= len(cipher_ids):
461+
if 'options' in packet:
462+
cipher_list = cast('list[Data_HIPCipherParameter]',
463+
packet['options'].getlist(Enum_Parameter.HIP_CIPHER))
464+
if cipher_list:
482465
warn(f'HIP: [ParamNo {Enum_Parameter.ENCRYPTED}] '
483-
'too many ENCRYPTED parameters', ProtocolWarning)
466+
'missing HIP_CIPHER parameter', ProtocolWarning)
484467
# raise ProtocolError(f'HIPv{version}: [ParamNo {schema.type}] invalid format')
485468

486-
cipher_id = Enum_Cipher(0xfffe)
469+
cipher_id = Enum_Cipher(0xffff)
487470
else:
488-
cipher_id = cipher_ids[encrypted_index]
471+
cipher_ids = [] # type: list[Enum_Cipher]
472+
for cipher in cipher_list:
473+
cipher_ids.extend(cipher.cipher_id)
474+
475+
encrypted_list = cast('list[Data_EncryptedParameter]',
476+
packet['options'].getlist(Enum_Parameter.ENCRYPTED))
477+
encrypted_index = len(encrypted_list)
478+
479+
if encrypted_index >= len(cipher_ids):
480+
warn(f'HIP: [ParamNo {Enum_Parameter.ENCRYPTED}] '
481+
'too many ENCRYPTED parameters', ProtocolWarning)
482+
# raise ProtocolError(f'HIPv{version}: [ParamNo {schema.type}] invalid format')
483+
484+
cipher_id = Enum_Cipher(0xfffe)
485+
else:
486+
cipher_id = cipher_ids[encrypted_index]
487+
else:
488+
warn(f'HIP: [ParamNo {Enum_Parameter.ENCRYPTED}] '
489+
'missing HIP_CIPHER parameter', ProtocolWarning)
490+
cipher_id = Enum_Cipher(0xffff)
489491

490492
packet['__cipher__'] = cipher_id
491493

pcapkit/protocols/schema/internet/hopopt.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939
if TYPE_CHECKING:
4040
from ipaddress import IPv4Address, IPv6Address
41-
from typing import Any, Optional
41+
from typing import Any, Optional, NoReturn
4242

4343
from pcapkit.corekit.fields.field import _Field as Field
4444
from pcapkit.protocols.protocol import Protocol
@@ -331,7 +331,7 @@ class _SMFDPDOption(Schema):
331331
selector=smf_dpd_data_selector,
332332
)
333333

334-
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
334+
def post_process(self, packet: 'dict[str, Any]') -> 'SMFDPDOption':
335335
"""Revise ``schema`` data after unpacking process.
336336
337337
Args:
@@ -342,7 +342,6 @@ def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
342342
343343
"""
344344
ret = self.data
345-
ret.mode = Enum_SMFDPDMode.get(self.test['mode'])
346345
return ret
347346

348347

@@ -357,9 +356,6 @@ class SMFDPDOption(Option):
357356
class SMFIdentificationBasedDPDOption(SMFDPDOption):
358357
"""Header schema for HOPOPT SMF identification-based DPD options."""
359358

360-
test: 'SMFDPDTestFlag' = ForwardMatchField(BitField(length=1, namespace={
361-
'mode': (0, 1),
362-
}))
363359
#: TaggerID information.
364360
info: 'TaggerIDInfo' = BitField(length=1, namespace={
365361
'mode': (0, 1),
@@ -377,6 +373,20 @@ class SMFIdentificationBasedDPDOption(SMFDPDOption):
377373
1 if pkt['info']['type'] == 0 else (pkt['info']['len'] + 2)
378374
))
379375

376+
def post_process(self, packet: 'dict[str, Any]') -> 'SMFIdentificationBasedDPDOption':
377+
"""Revise ``schema`` data after unpacking process.
378+
379+
Args:
380+
packet: Unpacked data.
381+
382+
Returns:
383+
Revised schema.
384+
385+
"""
386+
ret = super().post_process(packet) # type: SMFIdentificationBasedDPDOption
387+
ret.mode = Enum_SMFDPDMode.I_DPD
388+
return ret
389+
380390
if TYPE_CHECKING:
381391
def __init__(self, type: 'Enum_Option', len: 'int', info: 'TaggerIDInfo',
382392
tid: 'Optional[bytes]', id: 'bytes') -> 'None': ...
@@ -388,6 +398,20 @@ class SMFHashBasedDPDOption(SMFDPDOption):
388398
#: Hash assist value (HAV).
389399
hav: 'bytes' = BytesField(length=lambda pkt: pkt['len'])
390400

401+
def post_process(self, packet: 'dict[str, Any]') -> 'SMFIdentificationBasedDPDOption':
402+
"""Revise ``schema`` data after unpacking process.
403+
404+
Args:
405+
packet: Unpacked data.
406+
407+
Returns:
408+
Revised schema.
409+
410+
"""
411+
ret = super().post_process(packet) # type: SMFIdentificationBasedDPDOption
412+
ret.mode = Enum_SMFDPDMode.H_DPD
413+
return ret
414+
391415
if TYPE_CHECKING:
392416
def __init__(self, type: 'Enum_Option', len: 'int', hav: 'bytes') -> 'None': ...
393417

@@ -426,7 +450,7 @@ class _QuickStartOption(Schema):
426450
selector=quick_start_data_selector,
427451
)
428452

429-
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
453+
def post_process(self, packet: 'dict[str, Any]') -> 'QuickStartOption':
430454
"""Revise ``schema`` data after unpacking process.
431455
432456
Args:

pcapkit/protocols/schema/internet/ipv4.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ class _QSOption(Schema):
440440
selector=quick_start_data_selector,
441441
)
442442

443-
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
443+
def post_process(self, packet: 'dict[str, Any]') -> 'QSOption':
444444
"""Revise ``schema`` data after unpacking process.
445445
446446
Args:

pcapkit/protocols/schema/internet/ipv6_opts.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ class _SMFDPDOption(Schema):
331331
selector=smf_dpd_data_selector,
332332
)
333333

334-
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
334+
def post_process(self, packet: 'dict[str, Any]') -> 'SMFDPDOption':
335335
"""Revise ``schema`` data after unpacking process.
336336
337337
Args:
@@ -342,7 +342,6 @@ def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
342342
343343
"""
344344
ret = self.data
345-
ret.mode = Enum_SMFDPDMode.get(self.test['mode'])
346345
return ret
347346

348347

@@ -377,6 +376,20 @@ class SMFIdentificationBasedDPDOption(SMFDPDOption):
377376
1 if pkt['info']['type'] == 0 else (pkt['info']['len'] + 2)
378377
))
379378

379+
def post_process(self, packet: 'dict[str, Any]') -> 'SMFIdentificationBasedDPDOption':
380+
"""Revise ``schema`` data after unpacking process.
381+
382+
Args:
383+
packet: Unpacked data.
384+
385+
Returns:
386+
Revised schema.
387+
388+
"""
389+
ret = super().post_process(packet) # type: SMFIdentificationBasedDPDOption
390+
ret.mode = Enum_SMFDPDMode.I_DPD
391+
return ret
392+
380393
if TYPE_CHECKING:
381394
def __init__(self, type: 'Enum_Option', len: 'int', info: 'TaggerIDInfo',
382395
tid: 'Optional[bytes]', id: 'bytes') -> 'None': ...
@@ -388,6 +401,20 @@ class SMFHashBasedDPDOption(SMFDPDOption):
388401
#: Hash assist value (HAV).
389402
hav: 'bytes' = BytesField(length=lambda pkt: pkt['len'])
390403

404+
def post_process(self, packet: 'dict[str, Any]') -> 'SMFIdentificationBasedDPDOption':
405+
"""Revise ``schema`` data after unpacking process.
406+
407+
Args:
408+
packet: Unpacked data.
409+
410+
Returns:
411+
Revised schema.
412+
413+
"""
414+
ret = super().post_process(packet) # type: SMFIdentificationBasedDPDOption
415+
ret.mode = Enum_SMFDPDMode.H_DPD
416+
return ret
417+
391418
if TYPE_CHECKING:
392419
def __init__(self, type: 'Enum_Option', len: 'int', hav: 'bytes') -> 'None': ...
393420

@@ -426,7 +453,7 @@ class _QuickStartOption(Schema):
426453
selector=quick_start_data_selector,
427454
)
428455

429-
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
456+
def post_process(self, packet: 'dict[str, Any]') -> 'QuickStartOption':
430457
"""Revise ``schema`` data after unpacking process.
431458
432459
Args:

pcapkit/protocols/schema/internet/ipv6_route.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class UnknownType(RoutingType):
9898
"""Header schema for IPv6-Route unknown type routing data."""
9999

100100
#: Type-specific data.
101-
data: 'bytes' = BytesField(length=lambda pkt: pkt['length'] * 8 - 4)
101+
data: 'bytes' = BytesField(length=lambda pkt: pkt['__length__'])
102102

103103
if TYPE_CHECKING:
104104
def __init__(self, data: 'bytes') -> 'None': ...
@@ -111,7 +111,7 @@ class SourceRoute(RoutingType):
111111
reserved: 'bytes' = PaddingField(length=4)
112112
#: Addresses.
113113
ip: 'list[IPv6Address]' = ListField(
114-
length=lambda pkt: pkt['length'] * 8,
114+
length=lambda pkt: pkt['__length__'],
115115
item_type=IPv6AddressField(),
116116
)
117117

@@ -144,7 +144,7 @@ class RPL(RoutingType):
144144
})
145145
#: Addresses.
146146
addresses: 'bytes' = ListField(
147-
length=lambda pkt: pkt['length'] * 8 - pkt['pad']['pad_len'],
147+
length=lambda pkt: pkt['__length__'] - pkt['pad']['pad_len'],
148148
)
149149
#: Padding.
150150
padding: 'bytes' = PaddingField(length=lambda pkt: pkt['pad']['pad_len'])

0 commit comments

Comments
 (0)