170
170
from pcapkit .protocols .schema .misc .pcapng import ZigBeeAPSKey as Schema_ZigBeeAPSKey
171
171
from pcapkit .protocols .schema .misc .pcapng import ZigBeeNWKKey as Schema_ZigBeeNWKKey
172
172
from pcapkit .utilities .compat import StrEnum
173
- from pcapkit .utilities .exceptions import EndianError , FileError , ProtocolError , UnsupportedCall
174
- from pcapkit .utilities .warnings import RegistryWarning , warn
173
+ from pcapkit .utilities .exceptions import EndianError , FileError , ProtocolError , UnsupportedCall , stacklevel
174
+ from pcapkit .utilities .warnings import AttributeWarning , RegistryWarning , warn
175
175
176
176
__all__ = ['PCAPNG' ]
177
177
178
178
if TYPE_CHECKING :
179
- from datetime import timedelta , timezone
179
+ from datetime import timedelta , timezone , datetime as dt_type
180
180
from decimal import Decimal
181
181
from enum import IntEnum as StdlibEnum
182
182
from ipaddress import IPv4Address , IPv4Interface , IPv6Address , IPv6Interface
211
211
KwArg (Any )], Schema_NameResolutionRecord ]
212
212
213
213
# check Python version
214
+ py37 = ((version_info := sys .version_info ).major >= 3 and version_info .minor >= 7 )
214
215
py38 = ((version_info := sys .version_info ).major >= 3 and version_info .minor >= 8 )
215
216
216
217
# Ethernet address pattern
@@ -665,7 +666,10 @@ def nanosecond(self) -> 'bool':
665
666
def ts_resolution (self ) -> 'int' :
666
667
"""Timestamp resolution of the current block, in units per second."""
667
668
if self ._ctx is None :
668
- raise UnsupportedCall (f"'{ self .__class__ .__name__ } ' object has no attribute 'ts_resolution'" )
669
+ #raise UnsupportedCall(f"'{self.__class__.__name__}' object has no attribute 'ts_resolution'")
670
+ warn (f"'{ self .__class__ .__name__ } ' object has no attribute 'ts_resolution'" ,
671
+ AttributeWarning , stacklevel = stacklevel ())
672
+ return 1_000_000
669
673
670
674
info = cast ('Packet' , self ._info )
671
675
options = self ._ctx .interfaces [info .interface_id ].options
@@ -679,7 +683,10 @@ def ts_resolution(self) -> 'int':
679
683
def ts_offset (self ) -> 'int' :
680
684
"""Timestamp offset of the current block, in seconds."""
681
685
if self ._ctx is None :
682
- raise UnsupportedCall (f"'{ self .__class__ .__name__ } ' object has no attribute 'ts_offset'" )
686
+ #raise UnsupportedCall(f"'{self.__class__.__name__}' object has no attribute 'ts_offset'")
687
+ warn (f"'{ self .__class__ .__name__ } ' object has no attribute 'ts_offset'" ,
688
+ AttributeWarning , stacklevel = stacklevel ())
689
+ return 0
683
690
684
691
info = cast ('Packet' , self ._info )
685
692
options = self ._ctx .interfaces [info .interface_id ].options
@@ -690,17 +697,20 @@ def ts_offset(self) -> 'int':
690
697
return tsoffset .offset
691
698
692
699
@property
693
- def ts_timezone (self ) -> 'Optional[ timezone] ' :
700
+ def ts_timezone (self ) -> 'timezone' :
694
701
"""Timezone of the current block."""
695
702
if self ._ctx is None :
696
- raise UnsupportedCall (f"'{ self .__class__ .__name__ } ' object has no attribute 'ts_timezone'" )
703
+ #raise UnsupportedCall(f"'{self.__class__.__name__}' object has no attribute 'ts_timezone'")
704
+ warn (f"'{ self .__class__ .__name__ } ' object has no attribute 'ts_timezone'" ,
705
+ AttributeWarning , stacklevel = stacklevel ())
706
+ return self ._get_timezone ()
697
707
698
708
info = cast ('Packet' , self ._info )
699
709
options = self ._ctx .interfaces [info .interface_id ].options
700
710
tzone = cast ('Optional[Data_IF_TZoneOption]' ,
701
711
options .get (Enum_OptionType .if_tzone ))
702
712
if tzone is None :
703
- return None
713
+ return self . _get_timezone ()
704
714
return tzone .timezone
705
715
706
716
@property
@@ -1074,6 +1084,40 @@ def _make_data(cls, data: 'Data_PCAPNG') -> 'dict[str, Any]': # type: ignore[ov
1074
1084
'block' : data ,
1075
1085
}
1076
1086
1087
+ @staticmethod
1088
+ def _get_timezone () -> 'timezone' :
1089
+ """Get local timezone."""
1090
+ tzinfo = datetime .datetime .now (datetime .timezone .utc ).astimezone ().tzinfo
1091
+ if tzinfo is None :
1092
+ return datetime .timezone .utc
1093
+ return cast ('timezone' , tzinfo )
1094
+
1095
+ def _make_timestamp (self , timestamp : 'Optional[float | Decimal | dt_type | int]' = None ) -> 'tuple[int, int]' :
1096
+ """Make timestamp.
1097
+
1098
+ Args:
1099
+ timestamp: Timestamp in seconds.
1100
+
1101
+ Returns:
1102
+ Tuple of timestamp in higher and lower 32-bit integer value
1103
+ based on the given offset and timezone conversion.
1104
+
1105
+ """
1106
+ if timestamp is None :
1107
+ if py37 and self .nanosecond :
1108
+ timestamp = decimal .Decimal (time .time_ns ()) / 1_000_000_000
1109
+ else :
1110
+ timestamp = decimal .Decimal (time .time ())
1111
+ else :
1112
+ if isinstance (timestamp , datetime .datetime ):
1113
+ timestamp = timestamp .timestamp ()
1114
+ timestamp = decimal .Decimal (timestamp )
1115
+
1116
+ with decimal .localcontext (prec = 64 ):
1117
+ ts_info = int ((timestamp - self .ts_offset - \
1118
+ decimal .Decimal (self .ts_timezone .utcoffset (None ).total_seconds ())) * self .ts_resolution )
1119
+ return (ts_info >> 32 ) & 0xFFFF_FFFF , ts_info & 0xFFFF_FFFF
1120
+
1077
1121
def _read_mac_addr (self , addr : 'bytes' ) -> 'str' :
1078
1122
"""Read MAC address.
1079
1123
@@ -2112,12 +2156,14 @@ def _read_option_isb_starttime(self, schema: 'Schema_ISB_StartTimeOption', *,
2112
2156
2113
2157
timestamp_raw = schema .timestamp_high << 32 | schema .timestamp_low
2114
2158
with decimal .localcontext (prec = 64 ):
2115
- timestamp_epoch = decimal .Decimal (timestamp_raw ) / self .ts_resolution + self .ts_offset
2159
+ timestamp_epoch = decimal .Decimal (timestamp_raw ) / self .ts_resolution + self .ts_offset + \
2160
+ decimal .Decimal (self .ts_timezone .utcoffset (None ).total_seconds ())
2161
+ ts_ratio = timestamp_epoch .as_integer_ratio ()
2116
2162
2117
2163
option = Data_ISB_StartTimeOption (
2118
2164
type = schema .type ,
2119
2165
length = schema .length ,
2120
- timestamp = datetime .datetime .fromtimestamp (float ( timestamp_epoch ) , tz = self .ts_timezone ),
2166
+ timestamp = datetime .datetime .fromtimestamp (ts_ratio [ 0 ] / ts_ratio [ 1 ] , tz = self .ts_timezone ),
2121
2167
timestamp_epoch = timestamp_epoch ,
2122
2168
)
2123
2169
return option
@@ -2145,12 +2191,14 @@ def _read_option_isb_endtime(self, schema: 'Schema_ISB_EndTimeOption', *,
2145
2191
2146
2192
timestamp_raw = schema .timestamp_high << 32 | schema .timestamp_low
2147
2193
with decimal .localcontext (prec = 64 ):
2148
- timestamp_epoch = decimal .Decimal (timestamp_raw ) / self .ts_resolution + self .ts_offset
2194
+ timestamp_epoch = decimal .Decimal (timestamp_raw ) / self .ts_resolution + self .ts_offset + \
2195
+ decimal .Decimal (self .ts_timezone .utcoffset (None ).total_seconds ())
2196
+ ts_ratio = timestamp_epoch .as_integer_ratio ()
2149
2197
2150
2198
option = Data_ISB_EndTimeOption (
2151
2199
type = schema .type ,
2152
2200
length = schema .length ,
2153
- timestamp = datetime .datetime .fromtimestamp (float ( timestamp_epoch ) , tz = self .ts_timezone ),
2201
+ timestamp = datetime .datetime .fromtimestamp (ts_ratio [ 0 ] / ts_ratio [ 1 ] , tz = self .ts_timezone ),
2154
2202
timestamp_epoch = timestamp_epoch ,
2155
2203
)
2156
2204
return option
@@ -3477,7 +3525,226 @@ def _make_option_ns_dnsipv6(self, type: 'Enum_OptionType', option: 'Optional[Dat
3477
3525
ip = ip ,
3478
3526
)
3479
3527
3528
+ def _make_option_isb_starttime (self , type : 'Enum_OptionType' , option : 'Optional[Data_ISB_StartTimeOption]' = None , * ,
3529
+ timestamp : 'Optional[int | float | dt_type | Decimal]' = 0 ,
3530
+ ** kwargs : 'Any' ) -> 'Schema_ISB_StartTimeOption' :
3531
+ """Make PCAP-NG ``isb_starttime`` option.
3532
+
3533
+ Args:
3534
+ type: Option type.
3535
+ option: Option data model.
3536
+ ip: DNS server IPv6 address.
3537
+ **kwargs: Arbitrary keyword arguments.
3538
+
3539
+ Returns:
3540
+ Constructed option schema.
3541
+
3542
+ """
3543
+ if self ._type != Enum_BlockType .Interface_Statistics_Block :
3544
+ raise ProtocolError (f'PCAP-NG: [isb_starttime] option must be in Interface Statistics Block, '
3545
+ f'but found in { self ._type } block.' )
3546
+ if self ._opt [type ] > 0 :
3547
+ raise ProtocolError (f'PCAP-NG: [isb_starttime] option must be only one, '
3548
+ f'but { self ._opt [type ] + 1 } found.' )
3549
+
3550
+ if option is not None :
3551
+ timestamp = option .timestamp_epoch
3552
+ ts_high , ts_low = self ._make_timestamp (timestamp )
3553
+
3554
+ return Schema_ISB_StartTimeOption (
3555
+ type = type ,
3556
+ length = 8 ,
3557
+ timestamp_high = ts_high ,
3558
+ timestamp_low = ts_low ,
3559
+ )
3560
+
3561
+ def _make_option_isb_endtime (self , type : 'Enum_OptionType' , option : 'Optional[Data_ISB_EndTimeOption]' = None , * ,
3562
+ timestamp : 'Optional[int | float | dt_type | Decimal]' = 0 ,
3563
+ ** kwargs : 'Any' ) -> 'Schema_ISB_EndTimeOption' :
3564
+ """Make PCAP-NG ``isb_endtime`` option.
3565
+
3566
+ Args:
3567
+ type: Option type.
3568
+ option: Option data model.
3569
+ ip: DNS server IPv6 address.
3570
+ **kwargs: Arbitrary keyword arguments.
3571
+
3572
+ Returns:
3573
+ Constructed option schema.
3574
+
3575
+ """
3576
+ if self ._type != Enum_BlockType .Interface_Statistics_Block :
3577
+ raise ProtocolError (f'PCAP-NG: [isb_endtime] option must be in Interface Statistics Block, '
3578
+ f'but found in { self ._type } block.' )
3579
+ if self ._opt [type ] > 0 :
3580
+ raise ProtocolError (f'PCAP-NG: [isb_endtime] option must be only one, '
3581
+ f'but { self ._opt [type ] + 1 } found.' )
3582
+
3583
+ if option is not None :
3584
+ timestamp = option .timestamp_epoch
3585
+ ts_high , ts_low = self ._make_timestamp (timestamp )
3586
+
3587
+ return Schema_ISB_EndTimeOption (
3588
+ type = type ,
3589
+ length = 8 ,
3590
+ timestamp_high = ts_high ,
3591
+ timestamp_low = ts_low ,
3592
+ )
3593
+
3594
+ def _make_option_isb_ifrecv (self , type : 'Enum_OptionType' , option : 'Optional[Data_ISB_IFRecvOption]' = None , * ,
3595
+ packets : 'int' = 0 ,
3596
+ ** kwargs : 'Any' ) -> 'Schema_ISB_IFRecvOption' :
3597
+ """Make PCAP-NG ``isb_ifrecv`` option.
3598
+
3599
+ Args:
3600
+ type: Option type.
3601
+ option: Option data model.
3602
+ packets: Number of received packets.
3603
+ **kwargs: Arbitrary keyword arguments.
3604
+
3605
+ Returns:
3606
+ Constructed option schema.
3607
+
3608
+ """
3609
+ if self ._type != Enum_BlockType .Interface_Statistics_Block :
3610
+ raise ProtocolError (f'PCAP-NG: [isb_ifrecv] option must be in Interface Statistics Block, '
3611
+ f'but found in { self ._type } block.' )
3612
+ if self ._opt [type ] > 0 :
3613
+ raise ProtocolError (f'PCAP-NG: [isb_ifrecv] option must be only one, '
3614
+ f'but { self ._opt [type ] + 1 } found.' )
3615
+
3616
+ if option is not None :
3617
+ packets = option .packets
3618
+
3619
+ return Schema_ISB_IFRecvOption (
3620
+ type = type ,
3621
+ length = 8 ,
3622
+ packets = packets ,
3623
+ )
3624
+
3625
+ def _make_option_isb_ifdrop (self , type : 'Enum_OptionType' , option : 'Optional[Data_ISB_IFDropOption]' = None , * ,
3626
+ packets : 'int' = 0 ,
3627
+ ** kwargs : 'Any' ) -> 'Schema_ISB_IFDropOption' :
3628
+ """Make PCAP-NG ``isb_ifdrop`` option.
3480
3629
3630
+ Args:
3631
+ type: Option type.
3632
+ option: Option data model.
3633
+ packets: Number of dropped packets.
3634
+ **kwargs: Arbitrary keyword arguments.
3635
+
3636
+ Returns:
3637
+ Constructed option schema.
3638
+
3639
+ """
3640
+ if self ._type != Enum_BlockType .Interface_Statistics_Block :
3641
+ raise ProtocolError (f'PCAP-NG: [isb_ifdrop] option must be in Interface Statistics Block, '
3642
+ f'but found in { self ._type } block.' )
3643
+ if self ._opt [type ] > 0 :
3644
+ raise ProtocolError (f'PCAP-NG: [isb_ifdrop] option must be only one, '
3645
+ f'but { self ._opt [type ] + 1 } found.' )
3646
+
3647
+ if option is not None :
3648
+ packets = option .packets
3649
+
3650
+ return Schema_ISB_IFDropOption (
3651
+ type = type ,
3652
+ length = 8 ,
3653
+ packets = packets ,
3654
+ )
3655
+
3656
+ def _make_option_isb_filteraccept (self , type : 'Enum_OptionType' , option : 'Optional[Data_ISB_FilterAcceptOption]' = None , * ,
3657
+ packets : 'int' = 0 ,
3658
+ ** kwargs : 'Any' ) -> 'Schema_ISB_FilterAcceptOption' :
3659
+ """Make PCAP-NG ``isb_filteraccept`` option.
3660
+
3661
+ Args:
3662
+ type: Option type.
3663
+ option: Option data model.
3664
+ packets: Number of packets accepted by the filter.
3665
+ **kwargs: Arbitrary keyword arguments.
3666
+
3667
+ Returns:
3668
+ Constructed option schema.
3669
+
3670
+ """
3671
+ if self ._type != Enum_BlockType .Interface_Statistics_Block :
3672
+ raise ProtocolError (f'PCAP-NG: [isb_filteraccept] option must be in Interface Statistics Block, '
3673
+ f'but found in { self ._type } block.' )
3674
+ if self ._opt [type ] > 0 :
3675
+ raise ProtocolError (f'PCAP-NG: [isb_filteraccept] option must be only one, '
3676
+ f'but { self ._opt [type ] + 1 } found.' )
3677
+
3678
+ if option is not None :
3679
+ packets = option .packets
3680
+
3681
+ return Schema_ISB_FilterAcceptOption (
3682
+ type = type ,
3683
+ length = 8 ,
3684
+ packets = packets ,
3685
+ )
3686
+
3687
+ def _make_option_isb_osdrop (self , type : 'Enum_OptionType' , option : 'Optional[Data_ISB_OSDropOption]' = None , * ,
3688
+ packets : 'int' = 0 ,
3689
+ ** kwargs : 'Any' ) -> 'Schema_ISB_OSDropOption' :
3690
+ """Make PCAP-NG ``isb_osdrop`` option.
3691
+
3692
+ Args:
3693
+ type: Option type.
3694
+ option: Option data model.
3695
+ packets: Number of packets dropped by the OS.
3696
+ **kwargs: Arbitrary keyword arguments.
3697
+
3698
+ Returns:
3699
+ Constructed option schema.
3700
+
3701
+ """
3702
+ if self ._type != Enum_BlockType .Interface_Statistics_Block :
3703
+ raise ProtocolError (f'PCAP-NG: [isb_osdrop] option must be in Interface Statistics Block, '
3704
+ f'but found in { self ._type } block.' )
3705
+ if self ._opt [type ] > 0 :
3706
+ raise ProtocolError (f'PCAP-NG: [isb_osdrop] option must be only one, '
3707
+ f'but { self ._opt [type ] + 1 } found.' )
3708
+
3709
+ if option is not None :
3710
+ packets = option .packets
3711
+
3712
+ return Schema_ISB_OSDropOption (
3713
+ type = type ,
3714
+ length = 8 ,
3715
+ packets = packets ,
3716
+ )
3717
+
3718
+ def _make_option_isb_usrdeliv (self , type : 'Enum_OptionType' , option : 'Optional[Data_ISB_UsrDelivOption]' = None , * ,
3719
+ packets : 'int' = 0 ,
3720
+ ** kwargs : 'Any' ) -> 'Schema_ISB_UsrDelivOption' :
3721
+ """Make PCAP-NG ``isb_usrdeliv`` option.
3722
+
3723
+ Args:
3724
+ type: Option type.
3725
+ option: Option data model.
3726
+ packets: Number of dropped packets.
3727
+ **kwargs: Arbitrary keyword arguments.
3728
+
3729
+ Returns:
3730
+ Constructed option schema.
3731
+
3732
+ """
3733
+ if self ._type != Enum_BlockType .Interface_Statistics_Block :
3734
+ raise ProtocolError (f'PCAP-NG: [isb_usrdeliv] option must be in Interface Statistics Block, '
3735
+ f'but found in { self ._type } block.' )
3736
+ if self ._opt [type ] > 0 :
3737
+ raise ProtocolError (f'PCAP-NG: [isb_usrdeliv] option must be only one, '
3738
+ f'but { self ._opt [type ] + 1 } found.' )
3739
+
3740
+ if option is not None :
3741
+ packets = option .packets
3742
+
3743
+ return Schema_ISB_UsrDelivOption (
3744
+ type = type ,
3745
+ length = 8 ,
3746
+ packets = packets ,
3747
+ )
3481
3748
3482
3749
def _make_option_pack_flags (self , type : 'Enum_OptionType' , option : 'Optional[Data_PACK_FlagsOption]' = None , * ,
3483
3750
direction : 'PacketDirection | StdlibEnum | AenumEnum | str | int' = PacketDirection .UNKNOWN ,
0 commit comments