Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 315 additions & 1 deletion dnslib/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -1761,14 +1761,328 @@ def __repr__(self):
return "%d %s \"%s\"" % (self.flags, self.tag, self.value)


class HTTPS(RD):
"""
HTTPS record.

>>> HTTPS.fromZone(["1", "cloudflare.com."])
1 cloudflare.com.
>>> HTTPS.fromZone(["1", ".", "mandatory=key65444,echconfig"])
1 . mandatory=key65444,echconfig
>>> HTTPS.fromZone(["1", ".", "alpn=h3,h3-29,h2"])
1 . alpn=h3,h3-29,h2
>>> HTTPS.fromZone(["1", ".", "no-default-alpn"])
1 . no-default-alpn
>>> HTTPS.fromZone(["1", ".", "port=443"])
1 . port=443
>>> HTTPS.fromZone(["1", ".", "ipv4hint=104.16.132.229,104.16.133.229"])
1 . ipv4hint=104.16.132.229,104.16.133.229
>>> HTTPS.fromZone(["1", ".", "echconfig=Z2FyYmFnZQ=="])
1 . echconfig=Z2FyYmFnZQ==
>>> HTTPS.fromZone(["1", ".", "ipv6hint=2606:4700::6810:84e5,2606:4700::6810:85e5"])
1 . ipv6hint=2606:4700::6810:84e5,2606:4700::6810:85e5
>>> HTTPS.fromZone(["1", ".", "key9999=X"])
1 . key9999=X
>>> pcap = binascii.unhexlify(b"0001000001000c0268330568332d323902683200040008681084e5681085e500060020260647000000000000000000681084e5260647000000000000000000681085e5")
>>> obj = HTTPS.parse(Buffer(pcap), len(pcap))
>>> obj
1 . alpn=h3,h3-29,h2 ipv4hint=104.16.132.229,104.16.133.229 ipv6hint=2606:4700::6810:84e5,2606:4700::6810:85e5
>>> b = Buffer()
>>> obj.pack(b)
>>> b.data == pcap
True
>>> pcap = binascii.unhexlify(b"00010000040004c0a80126")
>>> obj = HTTPS.parse(Buffer(pcap), len(pcap))
>>> obj
1 . ipv4hint=192.168.1.38
>>> b = Buffer()
>>> obj.pack(b)
>>> b.data == pcap
True
"""

attrs = ('priority', 'target', 'params')

def __init__(self, priority, target, params):
self.priority = priority
self.target = target
self.params = params

@classmethod
def parse(cls,buffer,length):
try:
priority, = buffer.unpack("!H")
target = []
while True:
n, = buffer.unpack("B")
if n == 0:
break
seg = bytearray(buffer.get(n))
target.append(seg)
params = []
while buffer.remaining() > 0:
k, = buffer.unpack("!H")
n, = buffer.unpack("!H")
v = bytearray(buffer.get(n))
params.append((k, v))
return cls(priority, target, params)
except (BufferError,BimapError) as e:
raise DNSError("Error unpacking HTTPS: " + str(e) + buffer.data[buffer.offset:].hex())

def pack(self,buffer):
buffer.pack("!H", self.priority)
for seg in self.target:
buffer.pack("B", len(seg))
buffer.append(seg)
buffer.pack("B", 0)
for k, v in self.params:
buffer.pack("!H", k)
buffer.pack("!H", len(v))
buffer.append(v)

@classmethod
def zf_parse_valuelist(cls, s):
"""
>>> HTTPS.zf_parse_valuelist(bytearray(b'"part1,part2\\\\,part3"'))
[bytearray(b'part1'), bytearray(b'part2,part3')]
>>> HTTPS.zf_parse_valuelist(bytearray(b'part1,part2\\\\044part3'))
[bytearray(b'part1'), bytearray(b'part2,part3')]
"""
quot = 0x22
slash = 0x5c
comma = 0x2c
if len(s) == 0:
return []
if s[0] == quot:
if len(s) < 2 or s[-1] != quot:
raise DNSError("Error decoding HTTPS SvcParamKey value list: unmatched \"")
s = s[1:-1]
if len(s) == 0:
return []
esc = False
i = 0
ret = [bytearray()]
while i < len(s):
c = s[i]
if esc:
esc = False
if c >= 0x30 and c <= 0x32: #0 1 2
ret[-1].append(int(s[i:i+3]))
i += 3
else:
ret[-1].append(c)
i += 1
else:
if c == slash:
esc = True
i += 1
elif c == comma:
ret.append(bytearray())
i += 1
else:
ret[-1].append(c)
i += 1
if esc:
raise DNSError("Error decoding HTTPS SvcParamKey value list: hanging slash")
return ret

@classmethod
def zf_parse_charstr(cls, s):
"""
>>> HTTPS.zf_parse_charstr(bytearray(b'"part1,part2\\\\,part3"'))
bytearray(b'part1,part2,part3')
>>> HTTPS.zf_parse_charstr(bytearray(b'part1,part2\\\\044part3'))
bytearray(b'part1,part2,part3')
"""
quot = 0x22
slash = 0x5c
if len(s) == 0:
return bytearray()
if s[0] == quot:
if len(s) < 2 or s[-1] != quot:
raise DNSError("Error decoding HTTPS SvcParamKey charstring: unmatched \"")
s = s[1:-1]
esc = False
i = 0
ret = bytearray()
while i < len(s):
c = s[i]
if esc:
esc = False
if c >= 0x30 and c <= 0x32: #0 1 2
ret.append(int(s[i:i+3]))
i += 3
else:
ret.append(c)
i += 1
else:
if c == slash:
esc = True
i += 1
else:
ret.append(c)
i += 1
if esc:
raise DNSError("Error decoding HTTPS SvcParamKey charstring: hanging slash")
return ret

@classmethod
def zf_tobytes(cls, s):
''' for py2-3 compatibility '''
return bytearray(s.encode("ASCII"))

@classmethod
def zf_tostr(cls, b):
return b.decode("ASCII")

paramkeys = [
(0, b"mandatory"),
(1, b"alpn"),
(2, b"no-default-alpn"),
(3, b"port"),
(4, b"ipv4hint"),
(5, b"echconfig"),
(6, b"ipv6hint")
]

@classmethod
def zf_parse_key(cls, k):
if k.startswith(b"key"):
return int(k[3:])
for i, n in cls.paramkeys:
if k == n:
return i
raise DNSError("Error reading HTTPS from zone: unrecognized SvcParamKey")

@classmethod
def zf_parse_param(cls, k, v):
b = Buffer()
i = cls.zf_parse_key(k)
if i == 0: #mandatory
for s in cls.zf_parse_valuelist(v):
si = cls.zf_parse_key(s)
b.pack("!H", si)
elif i == 1: #alpn
for s in cls.zf_parse_valuelist(v):
b.pack("B", len(s))
b.append(s)
elif i == 2: #no alpn
if v:
raise DNSError("Error encoding HTTPS SvcParamKey: no-default-alpn should not have a value")
elif i == 3: #port
b.pack("!H", int(v))
elif i == 4: #ipv4
for ip in cls.zf_parse_valuelist(v):
b.pack("!4B", *[int(x) for x in ip.split(b".")])
elif i == 5: #ech
s = cls.zf_parse_charstr(v)
b.data = binascii.a2b_base64(s)
elif i == 6: #ipv6
for ip in cls.zf_parse_valuelist(v):
oc = _parse_ipv6(cls.zf_tostr(ip))
b.pack("!16B", *oc)
else:
b.data = v
return (i, b.data)

@classmethod
def fromZone(cls,rd,origin=None):
pri = int(rd[0])
targ = [] if rd[1] == "." else cls.zf_tobytes(rd[1]).split(b".")[:-1]
params = []
for kv in [cls.zf_tobytes(v) for v in rd[2:]]:
eq = kv.find(b"=")
if eq < 0:
k = kv
v = bytearray()
else:
k = kv[:eq]
v = kv[eq+1:]
params.append(cls.zf_parse_param(k, v))
return cls(pri, targ, params)

@classmethod
def zf_is_special(cls, c):
return not (c == 0x21 or \
c >= 0x23 and c<=0x27 or \
c >= 0x2A and c<=0x3A or \
c >= 0x3C and c<=0x5B or \
c >= 0x5D and c<=0x7E)

@classmethod
def zf_escape_charstr(cls, s, escape_commas=False):
ret = bytearray()
for c in s:
if cls.zf_is_special(c) or escape_commas and c == 0x2c:
ret.extend(b"\\")
ret.extend(b"%.3d" % c)
else:
ret.append(c)
return cls.zf_tostr(ret)

@classmethod
def zf_format_valuelist(cls, lst):
return ",".join(cls.zf_escape_charstr(s, True) for s in lst)

@classmethod
def zf_format_key(cls, k):
for i, n in cls.paramkeys:
if k == i:
return cls.zf_tostr(n)
return "key" + str(k)

@classmethod
def zf_format_param(cls, i, v):
b = Buffer(v)
k = cls.zf_format_key(i)
if i == 0: #mandatory
ret = []
while b.remaining() > 0:
ki, = b.unpack("!H")
ret.append(cls.zf_format_key(ki))
ret = ",".join(ret)
elif i == 1: #alpn
ret = []
while b.remaining() > 0:
n, = b.unpack("B")
ret.append(bytearray(b.get(n)))
ret = cls.zf_format_valuelist(ret)
elif i == 2: #no-alpn
if b.remaining() > 0:
raise DNSError("Error decoding HTTPS SvcParamKey: no-default-alpn should not have a value")
ret = ""
elif i == 3: #port
ret = str(b.unpack("!H")[0])
elif i == 4: #ipv4
ret = []
while b.remaining() > 0:
ip = "%d.%d.%d.%d" % b.unpack("!4B")
ret.append(ip)
ret = ",".join(ret)
elif i == 5: #ech
ret = cls.zf_tostr(binascii.b2a_base64(v).rstrip())
elif i == 6: #ipv6
ret = []
while b.remaining() > 0:
ip = b.unpack("!16B")
ret.append(_format_ipv6(ip))
ret = ",".join(ret)
else:
ret = cls.zf_tostr(v)
return k + ("=" + ret if ret else "")

def __repr__(self):
pri = str(self.priority)
targ = ".".join([self.zf_tostr(t) for t in self.target]) + "."
return " ".join([pri, targ] + [self.zf_format_param(k, v) for k,v in self.params])

# Map from RD type to class (used to pack/unpack records)
# If you add a new RD class you must add to RDMAP

RDMAP = { 'CNAME':CNAME, 'A':A, 'AAAA':AAAA, 'TXT':TXT, 'MX':MX,
'PTR':PTR, 'SOA':SOA, 'NS':NS, 'NAPTR': NAPTR, 'SRV':SRV,
'DNSKEY':DNSKEY, 'RRSIG':RRSIG, 'NSEC':NSEC, 'CAA':CAA
'DNSKEY':DNSKEY, 'RRSIG':RRSIG, 'NSEC':NSEC, 'CAA':CAA,
'HTTPS': HTTPS
}

##
Expand Down