Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 26 additions & 183 deletions scapy/contrib/icmp_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,186 +2,29 @@
# This file is part of Scapy
# See https://scapy.net/ for more information

# scapy.contrib.description = ICMP Extensions
# scapy.contrib.status = loads

import struct

import scapy
from scapy.compat import chb
from scapy.packet import Packet, bind_layers
from scapy.fields import BitField, ByteField, ConditionalField, \
FieldLenField, IPField, IntField, PacketListField, ShortField, \
StrLenField
from scapy.layers.inet import IP, ICMP, checksum
from scapy.layers.inet6 import IP6Field
from scapy.error import warning
from scapy.contrib.mpls import MPLS
from scapy.config import conf


class ICMPExtensionObject(Packet):
name = 'ICMP Extension Object'
fields_desc = [ShortField('len', None),
ByteField('classnum', 0),
ByteField('classtype', 0)]

def post_build(self, p, pay):
if self.len is None:
tmp_len = len(p) + len(pay)
p = struct.pack('!H', tmp_len) + p[2:]
return p + pay


class ICMPExtensionHeader(Packet):
name = 'ICMP Extension Header (RFC4884)'
fields_desc = [BitField('version', 2, 4),
BitField('reserved', 0, 12),
BitField('chksum', None, 16)]

_min_ieo_len = len(ICMPExtensionObject())

def post_build(self, p, pay):
if self.chksum is None:
ck = checksum(p)
p = p[:2] + chb(ck >> 8) + chb(ck & 0xff) + p[4:]
return p + pay

def guess_payload_class(self, payload):
if len(payload) < self._min_ieo_len:
return Packet.guess_payload_class(self, payload)

# Look at fields of the generic ICMPExtensionObject to determine which
# bound extension type to use.
ieo = ICMPExtensionObject(payload)
if ieo.len < self._min_ieo_len:
return Packet.guess_payload_class(self, payload)

for fval, cls in self.payload_guess:
if all(hasattr(ieo, k) and v == ieo.getfieldval(k)
for k, v in fval.items()):
return cls
return ICMPExtensionObject


def ICMPExtension_post_dissection(self, pkt):
# RFC4884 section 5.2 says if the ICMP packet length
# is >144 then ICMP extensions start at byte 137.

lastlayer = pkt.lastlayer()
if not isinstance(lastlayer, conf.padding_layer):
return

if IP in pkt:
if (ICMP in pkt and
pkt[ICMP].type in [3, 11, 12] and
pkt.len > 144):
bytes = pkt[ICMP].build()[136:]
else:
return
elif scapy.layers.inet6.IPv6 in pkt:
if ((scapy.layers.inet6.ICMPv6TimeExceeded in pkt or
scapy.layers.inet6.ICMPv6DestUnreach in pkt) and
pkt.plen > 144):
bytes = pkt[scapy.layers.inet6.ICMPv6TimeExceeded].build()[136:]
else:
return
else:
return

# validate checksum
ieh = ICMPExtensionHeader(bytes)
if checksum(ieh.build()):
return # failed

lastlayer.load = lastlayer.load[:-len(ieh)]
lastlayer.add_payload(ieh)


class ICMPExtensionMPLS(ICMPExtensionObject):
name = 'ICMP Extension Object - MPLS (RFC4950)'

fields_desc = [ShortField('len', None),
ByteField('classnum', 1),
ByteField('classtype', 1),
PacketListField('stack', [], MPLS,
length_from=lambda pkt: pkt.len - 4)]


class ICMPExtensionInterfaceInformation(ICMPExtensionObject):
name = 'ICMP Extension Object - Interface Information Object (RFC5837)'

fields_desc = [ShortField('len', None),
ByteField('classnum', 2),
BitField('interface_role', 0, 2),
BitField('reserved', 0, 2),
BitField('has_ifindex', 0, 1),
BitField('has_ipaddr', 0, 1),
BitField('has_ifname', 0, 1),
BitField('has_mtu', 0, 1),

ConditionalField(
IntField('ifindex', None),
lambda pkt: pkt.has_ifindex == 1),

ConditionalField(
ShortField('afi', None),
lambda pkt: pkt.has_ipaddr == 1),
ConditionalField(
ShortField('reserved2', 0),
lambda pkt: pkt.has_ipaddr == 1),
ConditionalField(
IPField('ip4', None),
lambda pkt: pkt.afi == 1),
ConditionalField(
IP6Field('ip6', None),
lambda pkt: pkt.afi == 2),

ConditionalField(
FieldLenField('ifname_len', None, fmt='B',
length_of='ifname'),
lambda pkt: pkt.has_ifname == 1),
ConditionalField(
StrLenField('ifname', None,
length_from=lambda pkt: pkt.ifname_len),
lambda pkt: pkt.has_ifname == 1),

ConditionalField(
IntField('mtu', None),
lambda pkt: pkt.has_mtu == 1)]

def self_build(self, **kwargs):
if self.afi is None:
if self.ip4 is not None:
self.afi = 1
elif self.ip6 is not None:
self.afi = 2

if self.has_ifindex and self.ifindex is None:
warning('has_ifindex set but ifindex is not set.')
if self.has_ipaddr and self.afi is None:
warning('has_ipaddr set but afi is not set.')
if self.has_ipaddr and self.ip4 is None and self.ip6 is None:
warning('has_ipaddr set but ip4 or ip6 is not set.')
if self.has_ifname and self.ifname is None:
warning('has_ifname set but ifname is not set.')
if self.has_mtu and self.mtu is None:
warning('has_mtu set but mtu is not set.')

return ICMPExtensionObject.self_build(self, **kwargs)


# Add the post_dissection() method to the existing ICMPv4 and
# ICMPv6 error messages
scapy.layers.inet.ICMPerror.post_dissection = ICMPExtension_post_dissection
scapy.layers.inet.TCPerror.post_dissection = ICMPExtension_post_dissection
scapy.layers.inet.UDPerror.post_dissection = ICMPExtension_post_dissection

scapy.layers.inet6.ICMPv6DestUnreach.post_dissection = ICMPExtension_post_dissection # noqa: E501
scapy.layers.inet6.ICMPv6TimeExceeded.post_dissection = ICMPExtension_post_dissection # noqa: E501


# ICMPExtensionHeader looks at fields from the upper layer object when
# determining which upper layer to use.
bind_layers(ICMPExtensionHeader, ICMPExtensionMPLS, classnum=1, classtype=1)
bind_layers(ICMPExtensionHeader, ICMPExtensionInterfaceInformation, classnum=2)
# scapy.contrib.description = ICMP Extensions (deprecated)
# scapy.contrib.status = deprecated

__all__ = [
"ICMPExtensionObject",
"ICMPExtensionHeader",
"ICMPExtensionInterfaceInformation",
"ICMPExtensionMPLS",
]

import warnings

from scapy.layers.inet import (
ICMPExtension_Object as ICMPExtensionObject,
ICMPExtension_Header as ICMPExtensionHeader,
ICMPExtension_InterfaceInformation as ICMPExtensionInterfaceInformation,
)
from scapy.contrib.mpls import (
ICMPExtension_MPLS as ICMPExtensionMPLS,
)

warnings.warn(
"scapy.contrib.icmp_extensions is deprecated. Behavior has changed ! "
"Use scapy.layers.inet",
DeprecationWarning
)
39 changes: 33 additions & 6 deletions scapy/contrib/mpls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,24 @@
# scapy.contrib.status = loads

from scapy.packet import Packet, bind_layers, Padding
from scapy.fields import BitField, ByteField, ShortField
from scapy.layers.inet import IP, UDP
from scapy.contrib.bier import BIER
from scapy.fields import (
BitField,
ByteField,
ByteEnumField,
PacketListField,
ShortField,
)

from scapy.layers.inet import (
_ICMP_classnums,
ICMPExtension_Object,
IP,
UDP,
)
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether, GRE
from scapy.compat import orb

from scapy.contrib.bier import BIER


class EoMCW(Packet):
Expand All @@ -37,21 +49,36 @@ def guess_payload_class(self, payload):
if len(payload) >= 1:
if not self.s:
return MPLS
ip_version = (orb(payload[0]) >> 4) & 0xF
ip_version = (payload[0] >> 4) & 0xF
if ip_version == 4:
return IP
elif ip_version == 5:
return BIER
elif ip_version == 6:
return IPv6
else:
if orb(payload[0]) == 0 and orb(payload[1]) == 0:
if payload[0] == 0 and payload[1] == 0:
return EoMCW
else:
return Ether
return Padding


# ICMP Extension

class ICMPExtension_MPLS(ICMPExtension_Object):
name = "ICMP Extension Object - MPLS (RFC4950)"

fields_desc = [
ShortField("len", None),
ByteEnumField("classnum", 1, _ICMP_classnums),
ByteField("classtype", 1),
PacketListField("stack", [], MPLS, length_from=lambda pkt: pkt.len - 4),
]


# Bindings

bind_layers(Ether, MPLS, type=0x8847)
bind_layers(IP, MPLS, proto=137)
bind_layers(IPv6, MPLS, nh=137)
Expand Down
2 changes: 2 additions & 0 deletions scapy/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -1893,6 +1893,8 @@ def i2repr(self,
def getfield(self, pkt, s):
# type: (Packet, bytes) -> Tuple[bytes, bytes]
len_pkt = self.length_from(pkt)
if len_pkt == 0:
return s, b""
return s[len_pkt:], self.m2i(pkt, s[:len_pkt])

def addfield(self, pkt, s, val):
Expand Down
Loading