forked from shmilylty/OneForAll
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiscdn.py
86 lines (71 loc) · 2.31 KB
/
iscdn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import json
import ipaddress
from config import settings
from common import utils
from common.database import Database
from config.log import logger
data_dir = settings.data_storage_dir
# from https://github.com/al0ne/Vxscan/blob/master/lib/iscdn.py
cdn_ip_cidr = utils.load_json(data_dir.joinpath('cdn_ip_cidr.json'))
cdn_asn_list = utils.load_json(data_dir.joinpath('cdn_asn_list.json'))
# from https://github.com/Qclover/CDNCheck/blob/master/checkCDN/cdn3_check.py
cdn_cname_keyword = utils.load_json(data_dir.joinpath('cdn_cname_keywords.json'))
cdn_header_key = utils.load_json(data_dir.joinpath('cdn_header_keys.json'))
def check_cname_keyword(cname):
if not cname:
return False
names = cname.lower().split(',')
for name in names:
for keyword in cdn_cname_keyword.keys():
if keyword in name:
return True
def check_header_key(header):
if isinstance(header, str):
header = json.loads(header)
if isinstance(header, dict):
header = set(map(lambda x: x.lower(), header.keys()))
for key in cdn_header_key:
if key in header:
return True
else:
return False
def check_cdn_cidr(ips):
if isinstance(ips, str):
ips = set(ips.split(','))
else:
return False
for ip in ips:
try:
ip = ipaddress.ip_address(ip)
except Exception as e:
logger.log('DEBUG', e.args)
return False
for cidr in cdn_ip_cidr:
if ip in ipaddress.ip_network(cidr):
return True
def check_cdn_asn(asn):
if isinstance(asn, str):
if asn in cdn_asn_list:
return True
return False
def do_check(data):
logger.log('DEBUG', f'Checking cdn')
for index, item in enumerate(data):
cname = item.get('cname')
if check_cname_keyword(cname):
data[index]['cdn'] = 1
continue
header = item.get('header')
if check_header_key(header):
data[index]['cdn'] = 1
continue
ip = item.get('ip')
if check_cdn_cidr(ip):
data[index]['cdn'] = 1
continue
asn = item.get('asn')
if check_cdn_asn(asn):
data[index]['cdn'] = 1
continue
data[index]['cdn'] = 0
return data