We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
拿cursor改了几个版本,有用到的地方库主可以参考下
# -*- coding: utf-8 -*- """ ICP-Checker.py 日期:2023-10-01 作者:soapffz 改编自:https://github.com/wongzeon/ICP-Checker 此脚本用于批量查询域名的备案信息。它首先获取必要的Cookie和Token,然后对输入的域名进行查询。查询结果包括备案信息和不支持备案的域名。 主要功能如下: 1. 批量查询:支持从文件中读取域名进行批量查询。 2. 备案信息获取:对每个域名,获取其备案信息,包括域名主办方、域名、备案许可证号、网站备案号、域名类型、网站前置审批项、是否限制接入、审核通过日期等。 3. 不支持备案的域名:对于不支持备案的域名,会打印出相应的提示信息。 4. 查询间隔:在批量查询中,每次查询之间有10秒的间隔,以防止频繁查询导致的问题。 注意:此脚本需要在Python 3环境下运行,并需要安装requests和tqdm等第三方库。 """ import re import os import cv2 import time import base64 import hashlib import requests import openpyxl as xl from openpyxl.styles import Alignment import argparse import sys import logging from tqdm import tqdm import os import subprocess import logging # 创建一个handler,用于写入日志文件 handler = logging.StreamHandler(sys.stdout) # 再创建一个handler,用于输出到控制台 console = logging.StreamHandler() console.setLevel(logging.INFO) # 定义handler的输出格式 formatter = logging.Formatter("%(message)s") handler.setFormatter(formatter) console.setFormatter(formatter) # 给logger添加handler logging.getLogger("").addHandler(handler) logging.getLogger("").addHandler(console) # 设置日志格式 logging.basicConfig(level=logging.INFO, format="%(message)s") os.environ["no_proxy"] = "*" # 添加命令行参数解析 arg_parser = argparse.ArgumentParser( description="Check ICP for a domain or a list of domains from a file." ) arg_parser.add_argument( "input", help="The domain or the file containing a list of domains." ) args = arg_parser.parse_args() # 设置保存路径 output_directory = "./outs/" if not os.path.exists(output_directory): os.makedirs(output_directory) # 使用requests.Session http_session = requests.Session() class CustomException(Exception): pass def send_request( url, method="get", headers=None, data=None, json=None, timeout=(3.06, 27) ): try: response = requests.request( method, url, headers=headers, data=data, json=json, timeout=timeout ) return response except requests.RequestException as e: raise CustomException(f"请求失败: {e}") def retrieve_cookies(): cookie_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32" } err_num = 0 while err_num < 3: response = send_request("https://beian.miit.gov.cn/", headers=cookie_headers) try: cookie = requests.utils.dict_from_cookiejar(response.cookies)["__jsluid_s"] return cookie except KeyError: err_num += 1 time.sleep(3) raise CustomException("获取Cookie失败,请重试!") def retrieve_token(): timeStamp = round(time.time() * 1000) authSecret = "testtest" + str(timeStamp) authKey = hashlib.md5(authSecret.encode(encoding="UTF-8")).hexdigest() auth_data = {"authKey": authKey, "timeStamp": timeStamp} url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth" try: t_response = requests.post( url=url, data=auth_data, headers=base_header, timeout=(3.06, 27) ).json() token = t_response["params"]["bussiness"] except: return -1 return token def retrieve_check_pic(token): url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImage" base_header["Accept"] = "application/json, text/plain, */*" base_header.update({"Content-Length": "0", "token": token}) try: p_request = requests.post( url=url, data="", headers=base_header, timeout=(3.06, 27) ).json() p_uuid = p_request["params"]["uuid"] big_image = p_request["params"]["bigImage"] small_image = p_request["params"]["smallImage"] except: return -1 # 解码图片,写入并计算图片缺口位置 with open("bigImage.jpg", "wb") as f: f.write(base64.b64decode(big_image)) with open("smallImage.jpg", "wb") as f: f.write(base64.b64decode(small_image)) background_image = cv2.imread("bigImage.jpg", cv2.COLOR_GRAY2RGB) fill_image = cv2.imread("smallImage.jpg", cv2.COLOR_GRAY2RGB) position_match = cv2.matchTemplate( background_image, fill_image, cv2.TM_CCOEFF_NORMED ) max_loc = cv2.minMaxLoc(position_match)[3][0] mouse_length = max_loc + 1 os.remove("bigImage.jpg") os.remove("smallImage.jpg") check_data = {"key": p_uuid, "value": mouse_length} return check_data def retrieve_sign(check_data, token): check_url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage" base_header.update( {"Content-Length": "60", "token": token, "Content-Type": "application/json"} ) try: pic_sign = requests.post( check_url, json=check_data, headers=base_header, timeout=(3.06, 27) ).json() sign = pic_sign["params"] except: return -1 return sign def query_base(info): # 过滤空值和特殊字符,只允许 - —《》. () 分别用于域名和公司名 try: if info == "": raise ValueError("InputNone") info = re.sub("[^\\u4e00-\\u9fa5-A-Za-z0-9,-.()《》—()]", "", info) info = ( info.replace(" ", "") .replace("https://www.", "") .replace("http://www.", "") .replace("http://", "") ) input_zh = re.compile("[\u4e00-\u9fa5]") zh_match = input_zh.search(info) if zh_match: info_result = info else: # 检测是否为可备案的域名类型(类型同步日期2022/01/06) input_url = re.compile( r"([^.]+)(?:\.(?:GOV\.cn|ORG\.cn|AC\.cn|MIL\.cn|NET\.cn|EDU\.cn|COM\.cn|BJ\.cn|TJ\.cn|SH\.cn|CQ\.cn|HE\.cn|SX\.cn|NM\.cn|LN\.cn|JL\.cn|HL\.cn|JS\.cn|ZJ\.cn|AH\.cn|FJ\.cn|JX\.cn|SD\.cn|HA\.cn|HB\.cn|HN\.cn|GD\.cn|GX\.cn|HI\.cn|SC\.cn|GZ\.cn|YN\.cn|XZ\.cn|SN\.cn|GS\.cn|QH\.cn|NX\.cn|XJ\.cn|TW\.cn|HK\.cn|MO\.cn|cn|REN|WANG|CITIC|TOP|SOHU|XIN|COM|NET|CLUB|XYZ|VIP|SITE|SHOP|INK|INFO|MOBI|RED|PRO|KIM|LTD|GROUP|BIZ|AUTO|LINK|WORK|LAW|BEER|STORE|TECH|FUN|ONLINE|ART|DESIGN|WIKI|LOVE|CENTER|VIDEO|SOCIAL|TEAM|SHOW|COOL|ZONE|WORLD|TODAY|CITY|CHAT|COMPANY|LIVE|FUND|GOLD|PLUS|GURU|RUN|PUB|EMAIL|LIFE|CO|FASHION|FIT|LUXE|YOGA|BAIDU|CLOUD|HOST|SPACE|PRESS|WEBSITE|ARCHI|ASIA|BIO|BLACK|BLUE|GREEN|LOTTO|ORGANIC|PET|PINK|POKER|PROMO|SKI|VOTE|VOTO|ICU|LA))", flags=re.IGNORECASE, ) info_result = input_url.search(info) if info_result is None: if info.split(".")[0] == "": raise ValueError("OnlyDomainInput") raise ValueError("ValidType") else: info_result = info_result.group() info_data = { "pageNum": "1", "pageSize": "40", "serviceType": 1, "unitName": info_result, } return info_data except ValueError as e: if str(e) == "InputNone" or str(e) == "OnlyDomainInput": logging.error(f"[-] 请正确输入域名: {info}") else: logging.error(f"[-] {info} 不支持备案") def retrieve_beian_info(info_data, p_uuid, token, sign): global base_header domain_list = [] info_url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition" base_header.update( {"Content-Length": "78", "uuid": p_uuid, "token": token, "sign": sign} ) max_retries = 3 for _ in range(max_retries): try: beian_info = requests.post( url=info_url, json=info_data, headers=base_header, timeout=(3.06, 27) ).json() if not beian_info["success"]: if beian_info["code"] in [401, 429]: # 如果遇到401或429错误,重新获取COOKIE和token logging.info("[+] 正在获取Cookie,请稍等……") cookie = retrieve_cookies() base_header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32", "Origin": "https://beian.miit.gov.cn", "Referer": "https://beian.miit.gov.cn/", "Cookie": f"__jsluid_s={cookie}", } if cookie != -1: token = retrieve_token() if token == -1: raise CustomException("获取Token失败") logging.info("[+] Retrieving Token, please wait……") if token != -1: logging.info("[+] Token retrieved, querying, please wait……") check_data = retrieve_check_pic(token) if check_data != -1: sign = retrieve_sign(check_data, token) p_uuid = check_data["key"] if sign != -1: base_header.update( { "Content-Length": "78", "uuid": p_uuid, "token": token, "sign": sign, } ) continue logging.error( f'[-] 请求错误: CODE {beian_info["code"]} MSG {beian_info["msg"]}' ) return domain_list # 如果请求成功,处理数据并退出循环 # ...(省略处理数据的代码) break except Exception as e: logging.error(f"[-] 意外错误: {e}") return domain_list return domain_list def save_data(domain_list): """ 打印最终结果,并保存数据至Excel表格,同时调整表格格式。 """ # 计算需要写入表格的总行数,如果是空列表,即代表该域名没有备案信息,也有可能是获取信息失败了 total_row = len(domain_list) if total_row == 1: total_row = 0 elif total_row == 0: logging.info("[!] 所查域名无备案") return logging.info(f"[+] 查询结果如下:\n\n{domain_list}") # 将表格保存到当前目录的outs文件夹下 file_path = os.path.join(output_directory, "备案信息.xlsx") # 存在对应文件,则读取表格追加写入,不存在则创建,并设置表格的标题、列宽、冻结窗格、文字布局等格式 if os.path.exists(file_path): wb = xl.load_workbook(file_path) ws = wb["备案信息"] max_row = ws.max_row start = max_row + 1 total_row = total_row + start after_title = 0 else: wb = xl.Workbook() ws = wb.active ws.title = "备案信息" title_list = [ "域名主办方", "域名", "备案许可证号", "网站备案号", "域名类型", "网站前置审批项", "是否限制接入", "审核通过日期", ] for i in range(0, 8): ws.cell(1, i + 1).value = title_list[i] col_width = { "A": 45, "B": 40, "C": 22, "D": 24, "E": 9, "F": 15, "G": 13, "H": 21, } for k, v in col_width.items(): ws.column_dimensions[k].width = v ws.freeze_panes = "A2" start = 0 after_title = 2 # 写入查询数据 for j in range(start, total_row + 1): for k in range(0, 8): try: ws.cell(j + after_title, k + 1).value = domain_list[j - start][k] except: continue # 垂直居中 for row in range(ws.max_row): for col in range(ws.max_column): ws.cell(row + 1, col + 1).alignment = Alignment( horizontal="center", vertical="center" ) try: wb.save(file_path) except PermissionError: logging.error("[!] 备案信息登记表格已打开,无法写入文件。如需写入,请关闭文件后重新执行!") return -1 logging.info(f"[+] 查询结果保存在:{file_path}") return "OK" import time def main(input): try: query_count = 0 # 添加计数器 while True: # 添加一个无限循环 if query_count % 20 == 0: # 每20次查询后重新生成COOKIE和token logging.info("[+] 正在获取Cookie,请稍等……") cookie = retrieve_cookies() global base_header base_header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32", "Origin": "https://beian.miit.gov.cn", "Referer": "https://beian.miit.gov.cn/", "Cookie": f"__jsluid_s={cookie}", } if cookie != -1: token = retrieve_token() logging.info("[+] Retrieving Token, please wait……") if token != -1: logging.info("[+] Token retrieved, querying, please wait……") check_data = retrieve_check_pic(token) if check_data != -1: sign = retrieve_sign(check_data, token) p_uuid = check_data["key"] if sign != -1: # If input is a file, perform batch check if os.path.isfile(input): # 获取文件行数作为进度条的总进度 with open(input) as f: total = sum(1 for _ in f) with open(input) as f, tqdm( total=total, ncols=70, position=0, leave=True ) as pbar: for line in f: domain = line.strip() logging.info(f"\n[+] 正在查询 {domain} ……") info = query_base(domain) domain_list = retrieve_beian_info( info, p_uuid, token, sign ) if domain_list: logging.info( f"\n{domain} 备案信息为:\n{domain_list}" ) else: logging.info(f"\n{domain} 不支持备案") save_data(domain_list) pbar.update() time.sleep(8) # 设置间隔时间 query_count += 1 # 每次查询后增加计数器的值 else: domain = input info = query_base(domain) domain_list = retrieve_beian_info( info, p_uuid, token, sign ) save_data(domain_list) query_count += 1 # 每次查询后增加计数器的值 except CustomException as e: logging.error(f"[-] {e}\n") if __name__ == "__main__": main(args.input)
检测到401重新生成cookie和token,就是429屏蔽设置8秒或10秒间隔都会被长时间封禁
还改了一个加代理池的版本,但是用代理IP会有SSL的问题
The text was updated successfully, but these errors were encountered:
感谢,这个写得不错👍
Sorry, something went wrong.
No branches or pull requests
拿cursor改了几个版本,有用到的地方库主可以参考下
检测到401重新生成cookie和token,就是429屏蔽设置8秒或10秒间隔都会被长时间封禁
还改了一个加代理池的版本,但是用代理IP会有SSL的问题
The text was updated successfully, but these errors were encountered: