-
Notifications
You must be signed in to change notification settings - Fork 0
/
Malicious-IP-Detector.py
219 lines (193 loc) · 8.31 KB
/
Malicious-IP-Detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import subprocess
import threading
import requests
from requests.exceptions import ConnectionError, RequestException
import time
import sys
import ctypes
from datetime import datetime
def welcome():
print(r'''
__ ___ __ _ _ ____ ____ ____ __ __
/ |/ /____ _ / /(_)_____ (_)____ __ __ _____ / _// __ \ / __ \ ___ / /_ ___ _____ / /_ ____ _____
/ /|_/ // __ `// // // ___// // __ \ / / / // ___/ / / / /_/ / / / / // _ \ / __// _ \ / ___// __// __ \ / ___/
/ / / // /_/ // // // /__ / // /_/ // /_/ /(__ ) _/ / / ____/ / /_/ // __// /_ / __// /__ / /_ / /_/ // /
/_/ /_/ \__,_//_//_/ \___//_/ \____/ \__,_//____/ /___//_/ /_____/ \___/ \__/ \___/ \___/ \__/ \____//_/
-------------------------------------------------------------------------------------------------------------------
{GitHub:https://github.com/RogueCyberSecurityChannel}''')
def header(date, time):
header_file = f"""
********************************************************
Malicious-Ip-Detector Log
[GitHub:https://github.com/RogueCyberSecurityChannel]
Date: {date}
Time: {time}
********************************************************"""
return header_file
def running_animation():
animation = "|/-\\"
i = 0
while True:
sys.stdout.write( "\r" + f" [" + animation[i % len(animation)] + "] ")
sys.stdout.flush()
i += 1
time.sleep(1)
def log_to_file(message):
with open('Malicious-IP_Detector_log.csv', 'a') as fd:
fd.write(f'{message}\r\n')
def log_message_generator(match, raw_pid_info, path, hash):
pid = "\n".join(line for line in raw_pid_info)
current_datetime = datetime.now()
time = current_datetime.strftime("%I:%M:%S %p")
date = current_datetime.strftime('%B %d, %Y')
log_message = (
f'Date: {date}'
f'\nTime: {time}'
f'\nIP: {match}\n'
f'\nFilepath: {path}'
f'\nSHA256 Hash: {hash}'
f'\nProcess information: {pid}\n\n'
)
log_to_file(log_message)
def pop_up(message):
ctypes.windll.user32.MessageBoxW(0, message, "Malicious-IP-Detector Alert", 16)
def web_scrape_and_process(url):
response = requests.get(url)
if response.status_code == 200:
raw_data = response.text
return raw_data
def banned_ip_parser(data):
ip_list = data.splitlines()
banned_ips = [line.split()[0] for line in ip_list if line.strip()]
return banned_ips[7:]
def netstat(command):
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
output_lines = result.stdout.splitlines()
raw_ips = [line.split()[2] for line in output_lines if len(line.split()) == 5]
raw_pids = [line.split()[4] for line in output_lines if len(line.split()) == 5]
foreign_ips = []
pids = []
index2 = 0
for ip in raw_ips:
index2 = index2 + 1
if "[" in ip:
continue
if "0.0.0.0" in ip:
continue
if '127.0.0.1' in ip:
continue
if ":" in ip:
index = ip.find(":")
foreign_ips.append(ip[:index])
pids.append(raw_pids[index2 - 1])
return foreign_ips, pids
except subprocess.CalledProcessError:
pass
def find_matches(list_1, list_2):
set1 = set(list_1)
set2 = set(list_2)
matches = list(set1.intersection(set2))
return matches
def get_timestamp():
current_datetime = datetime.now()
timestamp_string = current_datetime.strftime(" [*] %B %d, %Y\n [*] %I:%M:%S %p")
return timestamp_string
def pid_info_printer(command):
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
output_lines = result.stdout.splitlines()
return output_lines
except subprocess.CalledProcessError as e:
print(f" [-] Error executing PID information command: {e}")
pass
def path_finder(command):
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
output_lines = result.stdout.splitlines()
paths = []
for line, slice in enumerate(output_lines):
for index in range(len(slice) - 1):
if slice[index:index + 2] == 'C:':
path = output_lines[line][index:]
paths.append(path)
return paths
except subprocess.CalledProcessError as e:
print(f" [-] Could not find filepath: {e}")
pass
def hash_host_malware(command):
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
output_lines = result.stdout.splitlines()
driver_hash = output_lines[1]
return driver_hash
except subprocess.CalledProcessError:
pass
def lists_to_dict(keys, values):
return dict(zip(keys, values))
def main():
welcome()
current_datetime = datetime.now()
time_for_header = current_datetime.strftime("%I:%M:%S %p")
date = current_datetime.strftime('%B %d, %Y')
log_header = header(date, time_for_header)
log_to_file(log_header)
try:
time.sleep(1)
data = web_scrape_and_process('https://raw.githubusercontent.com/stamparm/ipsum/master/ipsum.txt')
parsed_ip_list = banned_ip_parser(data)
except (ConnectionError, RequestException) as e:
time.sleep(1)
print(f' [-] An error occurred while trying to establish a secure connection. Please check your internet connection and try again later.\n')
time.sleep(30)
sys.exit(1)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
print(str(e))
sys.exit(1)
while True:
try:
output = netstat('netstat -ano')
host_ips = output[0]
host_pids = output[1]
ip_pid_dictionary = lists_to_dict(host_ips, host_pids)
matches = find_matches(parsed_ip_list, host_ips)
match_list = []
if len(matches):
for match in matches:
match_list.append(match)
else:
global flag
if flag == False:
print("\n [+] No active malicious IP connections detected")
animation_thread = threading.Thread(target=running_animation, daemon=True)
animation_thread.start()
flag = True
for match in match_list:
pop_up_thread = threading.Thread(target=pop_up,args=(f"[+] Connection to know malicious IP detected!\n[+] IP: {match} ",))
pop_up_thread.start()
print(f" \n [!] ACTIVE CONNECTION TO KNOWN MALICIOUS IP DETECTED")
timestamp = get_timestamp()
print(timestamp)
print(f' [-] IP: {match} PID: {ip_pid_dictionary[match]}')
print(f' [+] Event logged to "Malicious-IP_Detector_log.csv"')
print(' [*] Process information: ')
pid_info = pid_info_printer(f'tasklist /FI "PID eq {ip_pid_dictionary[match]}" /V')
for line in pid_info:
print(' ' + line)
path_info = path_finder(f'wmic process where ProcessId={ip_pid_dictionary[match]} get ExecutablePath')
for path in path_info:
print('\n Filepath: ' + path)
detection_hash = hash_host_malware(f'certutil -hashfile "{path}" SHA256')
print(" SHA256 Hash: " + detection_hash + '\n')
log_message_generator(match, pid_info, path, detection_hash)
pop_up_thread.join()
time.sleep(30)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
print(str(e))
if __name__ == "__main__":
flag = False
main()