This repository was archived by the owner on Sep 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog_manipulator.py
263 lines (222 loc) · 12.2 KB
/
log_manipulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import json
import re
import time
from datetime import datetime
from hashlib import md5
from logging import Logger
from os.path import getmtime, getsize
from tempfile import TemporaryFile
from threading import Lock
from typing import List
class LogParser:
"""
Class for parsing information about attacks from log files
"""
def __init__(self, file_log, rules, service_name=None, logger=None): # type: (str, List[str], str, Logger) -> None
"""
Initialize the log parser
:param file_log: path to the file with logs
:param rules: list of string filters/rules
:param service_name: optional name of the service. If not specified then found attacks are not assigned to any
service
:param logger: optional logger. If specified, LogParsers fires messages into him
"""
self.file_log = file_log
self.rules = [rule if type(rule) == Rule else Rule(rule, service_name) for rule in rules]
self.logger = logger
self._last_file_size = 0
self._last_file_modification_date = None
self._attack_cache_file = TemporaryFile() # here will all attacks stay cached
self._attack_cache_file_lock = Lock()
# if this last bytes are same then we are sure the file was not modified
self._last_bytes = {'hash': None, 'len': 0}
self.force_rescan()
def parse_attacks(self, max_age=None, skip_scanning=False): # type: (float, bool) -> dict
"""
Parses the attacks from log file and returns them
:param max_age: optional, in seconds. If attack is older as this then it is ignored
:param skip_scanning: if set to true then the read content is not analyzed for attacks
:return: dictionary. Key is the IP that attacked and value is list of dictionaries with data about every attack
"""
if self.logger is not None:
self.logger.debug('parsing attacks for %s' % self.file_log)
attacks = {}
curr_file_size = getsize(self.file_log)
curr_file_modification_time = getmtime(self.file_log)
if self._last_file_size == curr_file_size and curr_file_modification_time == self._last_file_modification_date:
if self.logger is not None:
self.logger.debug('nothing changed, nothing new to parse')
# it seems that the files has not changed so skip analyzing it
return attacks
continue_in_scanning = True # when set to True, only new content in file is analyzed
if self._last_file_size > curr_file_size:
# when the current file is smaller, something has happened to it. We will rescan it to be sure
if self.logger is not None:
self.logger.debug('file went smaller since las scan, rescan it')
continue_in_scanning = False
self.force_rescan()
if self._last_file_size == curr_file_size and self._last_file_modification_date != curr_file_modification_time:
# the file has the same size but was still modified, we better rescan it
if self.logger is not None:
self.logger.debug('file is the same size but it was modified,rescan it')
continue_in_scanning = False
self.force_rescan()
with open(self.file_log, 'r') as f:
if continue_in_scanning and self._last_bytes['hash'] is not None:
# check last few bytes if they are same
f.seek(self._last_file_size - self._last_bytes['len'] - 5)
if md5(f.read(self._last_bytes['len']).encode('utf8')).hexdigest() != self._last_bytes['hash']:
# nope, last few bytes differ, something seems really odd about this file. Better rescan it
if self.logger is not None:
self.logger.debug('last few scanned bytes differ, rescan it')
self.force_rescan()
f.seek(self._last_file_size) # skip all already analyzed content
new_content = f.read()
# save last ten bytes so we can know if the file is still the same during new analyze
content_end = new_content[-256:-5]
self._last_bytes['hash'] = md5(content_end.encode('utf8')).hexdigest()
self._last_bytes['len'] = len(content_end)
log_lines = new_content.splitlines()
del new_content, content_end
if not skip_scanning:
for log_line in log_lines:
if self.logger is not None:
self.logger.debug('log line "%s"' % log_line.replace('\n', '\\n'))
for rule in self.rules:
variables = rule.get_variables(log_line)
if variables is not None:
if max_age is not None and time.time() - max_age > variables['TIMESTAMP']:
break
attacker_ip = variables['IP']
del variables['IP']
item = attacks.get(attacker_ip, [])
item.append(variables)
attacks[attacker_ip] = item
break
self._last_file_modification_date = curr_file_modification_time
self._last_file_size = curr_file_size
self._attack_cache_file_lock.acquire()
self._attack_cache_file.seek(0)
attacks.update(json.loads(self._attack_cache_file.read().decode('utf8')))
self._attack_cache_file.seek(0)
self._attack_cache_file.truncate(0)
self._attack_cache_file.write(json.dumps(attacks).encode('utf8'))
self._attack_cache_file.flush()
self._attack_cache_file_lock.release()
return attacks
def get_habitual_offenders(self, min_attack_attempts, attack_attempts_time, max_age=None, attacks=None,
first_load=False):
# type: (int, int, int, dict, bool) -> dict
"""
Finds IPs that had performed more than allowed number of attacks in specified time range
:param min_attack_attempts: minimum allowed number of attacks in time range to be included
:param attack_attempts_time: the time range in which all of the attacks must have occurred in seconds
:param max_age: optional, in seconds. If attack is older as this then it is ignored
:param attacks: optional. If None, then the value of self.parse_attacks(max_age) is used
:param first_load: If true, then the log file is read only, not scanned for attacks
:return: dictionary. Key is the IP that attacked more or equal than min_attack_attempts times and
value is list of dictionaries with data about every attack in specified time range
"""
attacks = self.parse_attacks(max_age, first_load) if attacks is None else attacks
habitual_offenders = {}
for ip, attack_list in attacks.items():
for attack in attack_list:
attacks_in_time_range = []
for attack2 in attack_list:
attack_time_delta = attack2['TIMESTAMP'] - attack['TIMESTAMP']
if 0 <= attack_time_delta <= attack_attempts_time:
attacks_in_time_range.append(attack2)
if len(attacks_in_time_range) > min_attack_attempts:
break
if len(attacks_in_time_range) >= min_attack_attempts:
habitual_offenders[ip] = attack_list
return habitual_offenders
def force_rescan(self): # type: () -> None
"""
Resets progress info about log file, forcing program to perform the next scan from the beginning
:return: None
"""
self._last_file_size = 0
self._last_file_modification_date = None
self._attack_cache_file_lock.acquire()
self._attack_cache_file.seek(0)
self._attack_cache_file.truncate(0)
self._attack_cache_file.write(json.dumps({}).encode('utf8'))
self._attack_cache_file.flush()
self._attack_cache_file_lock.release()
class Rule:
"""
Rule or filter that can be tested on a line from config line. If this rule/filter fits, than it can parse
variables from that line
"""
def __init__(self, filter_string, service_name=None): # type: (str, str or None) -> None
"""
Initializes this rule/filter
:param filter_string: string representation of this rule/filter with all variables stated as %VAR_NAME%
:param service_name: optional name of the service. If not specified then found attacks are not assigned to any
service
"""
self.__service_name = service_name
self.__rule_variables = re.findall("%.*?%", filter_string)
# Generate regex for rule detection
self.__rule_regex = filter_string
for reserved_char in list("\\+*?^$.[]{}()|/"): # escape reserved regex characters
self.__rule_regex = self.__rule_regex.replace(reserved_char, '\\' + reserved_char)
for variable in self.__rule_variables: # replace all variables with any regex characters
self.__rule_regex = self.__rule_regex.replace(variable, '(.+?)')
if self.__rule_regex.endswith('?)'): # disable lazy search for last variables so they are found whole
self.__rule_regex = self.__rule_regex[:-2] + ')'
# Remove %'s from variable names
self.__rule_variables = [var[1:-1] for var in self.__rule_variables]
def test(self, log_line): # type: (str) -> bool
"""
Test this Rule against a line from log file if it fits
:param log_line: line from a log file
:return: True if it fits, False if this rule cannot be applied to this line
"""
return True if re.match(self.__rule_regex, log_line) else False
def get_variables(self, log_line): # type: (str) -> dict or None
"""
Parses variables from log line that fits this rule
:param log_line: line from a log file
:return: None if this rule cannot be applied to this line, otherwise returns a dictionary with parsed variables
from this line
"""
data = {}
# Parse all variables from log line
variable_search = re.match(self.__rule_regex, log_line)
if not variable_search: # this rule is not for this line
return None
# noinspection PyTypeChecker
for i, variable in enumerate(self.__rule_variables):
data[variable] = variable_search.group(i + 1).strip()
# attempt to parse raw IP
if 'IP' in data:
ip = re.findall('[0-9]{3}\\.[0-9]{3}\\.[0-9]{3}\\.[0-9]{3}', data['IP'])
if len(ip) == 0:
ip = re.findall('([0-9a-f]{1,4}|::)((:{0,2}[0-9a-f]{0,4}){0,7})', data['IP'])
if len(ip) == 0:
data['IP-RAW'], data['IP'] = data['IP'], 'NO VALID IP FOUND'
if self.__service_name is not None:
data['SERVICE'] = self.__service_name
date_format = '%Y %b %d %H:%M:%S'
date_string = None
if 'D:M' in data and 'D:D' in data and 'TIME' in data:
date_string = '%s %s %s %s' % (datetime.now().strftime('%Y'), data['D:M'],
data['D:D'], data['TIME'])
elif 'D:M' in data and 'D:D' in data:
date_string = '%s %s %s 00:00:00' % (datetime.now().strftime('%Y'), data['D:M'], data['D:D'])
elif 'TIME' in data:
# noinspection PyTypeChecker
date_string = datetime.now().strftime('%Y %b %d') + ' ' + data['TIME']
data['TIMESTAMP'] = time.time() if date_string is None else \
time.mktime(datetime.strptime(date_string, date_format).timetuple())
return data
# If launched directly, perform a quick proof of work in file debug.log
if __name__ == '__main__':
all_rules = ["%D:M% %D:D% %TIME% %IP% attacked on user %USER%"]
file = 'debug.log'
parser = LogParser(file, all_rules)
offenders = parser.get_habitual_offenders(3, 100000)
for off_ip, off_attacks in offenders.items():
print(off_ip + ':', off_attacks)