20
20
__all__ = ['DPKT' ]
21
21
22
22
if TYPE_CHECKING :
23
- from typing import Iterator
23
+ from typing import Type , Union , Optional
24
24
25
25
from dpkt .dpkt import Packet as DPKTPacket
26
+ from dpkt .pcap import Reader as PCAPReader
27
+ from dpkt .pcapng import Reader as PCAPNGReader
26
28
27
29
from pcapkit .foundation .extraction import Extractor
28
- from pcapkit .protocols .misc .pcap .header import Header
30
+
31
+ Reader = Union [PCAPReader , PCAPNGReader ]
29
32
30
33
31
34
class DPKT (Engine ['DPKTPacket' ]):
@@ -36,12 +39,6 @@ class DPKT(Engine['DPKTPacket']):
36
39
37
40
"""
38
41
39
- if TYPE_CHECKING :
40
- #: Global header.
41
- _gbhdr : 'Header'
42
- #: Data link layer protocol.
43
- _dlink : 'Enum_LinkType'
44
-
45
42
##########################################################################
46
43
# Properties.
47
44
##########################################################################
@@ -64,7 +61,7 @@ def __init__(self, extractor: 'Extractor') -> 'None':
64
61
import dpkt # isort:skip
65
62
66
63
self ._expkg = dpkt
67
- self ._extmp = cast ('Iterator[tuple[float, DPKTPacket]] ' , None )
64
+ self ._extmp = cast ('Reader ' , None )
68
65
69
66
super ().__init__ (extractor )
70
67
@@ -112,34 +109,8 @@ def run(self) -> 'None':
112
109
reader = dpkt .pcapng .Reader (ext ._ifile )
113
110
else :
114
111
raise FormatError (f'unsupported file format: { ext .magic_number !r} ' )
115
- self ._dlink = Enum_LinkType .get (reader .datalink ())
116
-
117
- if self ._dlink == Enum_LinkType .ETHERNET :
118
- pkg = dpkt .ethernet .Ethernet
119
- elif self ._dlink .value == Enum_LinkType .IPV4 :
120
- pkg = dpkt .ip .IP
121
- elif self ._dlink .value == Enum_LinkType .IPV6 :
122
- pkg = dpkt .ip6 .IP6
123
- else :
124
- warn ('unrecognised link layer protocol; all analysis functions ignored' ,
125
- DPKTWarning , stacklevel = stacklevel ())
126
-
127
- class RawPacket (dpkt .dpkt .Packet ): # type: ignore[name-defined]
128
- """Raw packet."""
129
-
130
- def __len__ (ext ) -> 'int' :
131
- return len (ext .data )
132
-
133
- def __bytes__ (ext ) -> 'bytes' :
134
- return ext .data
135
-
136
- def unpack (ext , buf : 'bytes' ) -> 'None' :
137
- ext .data = buf
138
-
139
- pkg = RawPacket
140
112
141
113
# extract & analyse file
142
- self ._expkg = pkg
143
114
self ._extmp = reader
144
115
145
116
def read_frame (self ) -> 'DPKTPacket' :
@@ -156,9 +127,13 @@ def read_frame(self) -> 'DPKTPacket':
156
127
tcp_reassembly , tcp_traceflow )
157
128
ext = self ._extractor
158
129
130
+ reader = self ._extmp
131
+ linktype = Enum_LinkType .get (reader .datalink ())
132
+
159
133
# fetch DPKT packet
160
- timestamp , pkt = cast ('tuple[float, bytes]' , next (self ._extmp ))
161
- packet = self ._expkg (pkt ) # type: DPKTPacket
134
+ timestamp , pkt = cast ('tuple[float, bytes]' , next (reader ))
135
+ protocol = self ._get_protocol (linktype )
136
+ packet = protocol (pkt ) # type: DPKTPacket
162
137
163
138
# verbose output
164
139
ext ._frnum += 1
@@ -167,7 +142,7 @@ def read_frame(self) -> 'DPKTPacket':
167
142
# write plist
168
143
frnum = f'Frame { ext ._frnum } '
169
144
if not ext ._flag_q :
170
- info = packet2dict (packet , timestamp , data_link = self . _dlink )
145
+ info = packet2dict (packet , timestamp , data_link = linktype )
171
146
if ext ._flag_f :
172
147
ofile = ext ._ofile (f'{ ext ._ofnm } /{ frnum } .{ ext ._fext } ' )
173
148
ofile (info , name = frnum )
@@ -194,7 +169,7 @@ def read_frame(self) -> 'DPKTPacket':
194
169
# trace flows
195
170
if ext ._flag_t :
196
171
if ext ._tcp :
197
- data_tf_tcp = tcp_traceflow (packet , timestamp , data_link = self . _dlink , count = ext ._frnum )
172
+ data_tf_tcp = tcp_traceflow (packet , timestamp , data_link = linktype , count = ext ._frnum )
198
173
if data_tf_tcp is not None :
199
174
ext ._trace .tcp (data_tf_tcp )
200
175
@@ -206,3 +181,45 @@ def read_frame(self) -> 'DPKTPacket':
206
181
207
182
# return frame record
208
183
return packet
184
+
185
+ ##########################################################################
186
+ # Utilities.
187
+ ##########################################################################
188
+
189
+ def _get_protocol (self , linktype : 'Optional[Enum_LinkType]' = None ) -> 'Type[DPKTPacket]' :
190
+ """Returns the protocol for parsing the current packet.
191
+
192
+ Args:
193
+ linktype: Link type code.
194
+
195
+ """
196
+ dpkt = self ._expkg
197
+ reader = self ._extmp
198
+
199
+ if linktype is None :
200
+ linktype = Enum_LinkType .get (reader .datalink ())
201
+
202
+ if linktype == Enum_LinkType .ETHERNET :
203
+ pkg = dpkt .ethernet .Ethernet
204
+ elif linktype .value == Enum_LinkType .IPV4 :
205
+ pkg = dpkt .ip .IP
206
+ elif linktype .value == Enum_LinkType .IPV6 :
207
+ pkg = dpkt .ip6 .IP6
208
+ else :
209
+ warn ('unrecognised link layer protocol; all analysis functions ignored' ,
210
+ DPKTWarning , stacklevel = stacklevel ())
211
+
212
+ class RawPacket (dpkt .dpkt .Packet ): # type: ignore[name-defined]
213
+ """Raw packet."""
214
+
215
+ def __len__ (ext ) -> 'int' :
216
+ return len (ext .data )
217
+
218
+ def __bytes__ (ext ) -> 'bytes' :
219
+ return ext .data
220
+
221
+ def unpack (ext , buf : 'bytes' ) -> 'None' :
222
+ ext .data = buf
223
+
224
+ pkg = RawPacket
225
+ return pkg
0 commit comments