forked from globaleaks/globaleaks-whistleblowing-software
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathanomaly.py
232 lines (179 loc) · 8.28 KB
/
anomaly.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# -*- coding: utf-8 -*-
#
# Implement anomalies check
from twisted.internet.defer import inlineCallbacks
from globaleaks import models
from globaleaks.handlers.admin.node import db_admin_serialize_node
from globaleaks.handlers.admin.notification import db_get_notification
from globaleaks.handlers.admin.user import db_get_admin_users
from globaleaks.orm import transact
from globaleaks.rest.cache import Cache
from globaleaks.state import State
from globaleaks.transactions import db_schedule_email
from globaleaks.utils.log import log
from globaleaks.utils.templating import Templating
from globaleaks.utils.utility import datetime_now, datetime_null, get_disk_space, is_expired
ANOMALY_MAP = {
'started_submissions': 100,
'completed_submissions': 20,
'failed_submissions': 5,
'failed_logins': 5,
'successful_logins': 20,
}
def get_disk_anomaly_conditions(free_workdir_bytes, total_workdir_bytes):
free_disk_megabytes = free_workdir_bytes / (1024 * 1024)
free_disk_percentage = free_workdir_bytes / (total_workdir_bytes / 100)
def info_msg_0():
return "free_disk_megabytes <= %d or free_disk_percentage <= %d" % \
(State.tenant_cache[1].threshold_free_disk_megabytes_high,
State.tenant_cache[1].threshold_free_disk_percentage_high)
def info_msg_1():
return "free_disk_megabytes <= %d or free_disk_percentage <= %d" % \
(State.tenant_cache[1].threshold_free_disk_megabytes_low,
State.tenant_cache[1].threshold_free_disk_percentage_low)
# list of bad conditions ordered starting from the worst case scenario
return [
{
'condition': free_disk_megabytes <= State.tenant_cache[1].threshold_free_disk_megabytes_high or
free_disk_percentage <= State.tenant_cache[1].threshold_free_disk_percentage_high,
'info_msg': info_msg_0,
'alarm_level': 2,
'accept_submissions': False
},
{
'condition': free_disk_megabytes <= State.tenant_cache[1].threshold_free_disk_megabytes_low or
free_disk_percentage <= State.tenant_cache[1].threshold_free_disk_percentage_low,
'info_msg': info_msg_1,
'alarm_level': 1,
'accept_submissions': True
}
]
@transact
def generate_admin_alert_mail(session, tid, alert):
for user_desc in db_get_admin_users(session, tid):
user_language = user_desc['language']
data = {
'type': 'admin_anomaly',
'node': db_admin_serialize_node(session, tid, user_language),
'notification': db_get_notification(session, tid, user_language),
'alert': alert,
'user': user_desc,
}
subject, body = Templating().get_mail_subject_and_body(data)
db_schedule_email(session, tid, user_desc['mail_address'], subject, body)
@transact
def save_anomalies(session):
for tid in State.tenant_state:
for anomaly in State.tenant_state[tid].AnomaliesQ:
a = models.Anomalies()
a.tid = tid
a.alarm = anomaly[2]
a.date = anomaly[0]
a.events = anomaly[1]
session.add(a)
class Alarm(object):
def __init__(self, state):
self.state = state
self.last_alarm_email = datetime_null()
self.event_matrix = {}
self.measured_freespace = 0
self.measured_totalspace = 0
self.alarm_levels = {
'disk_space': 0,
'disk_message': None,
'activity': 0
}
@inlineCallbacks
def check_tenant_anomalies(self, tid):
"""
This function update the Alarm level.
"""
self.number_of_anomalies = 0
self.event_matrix.clear()
for event in State.tenant_state[tid].RecentEventQ:
self.event_matrix.setdefault(event.event_type, 0)
self.event_matrix[event.event_type] += 1
for event_name, threshold in ANOMALY_MAP.items():
if event_name in self.event_matrix:
if self.event_matrix[event_name] > threshold:
self.number_of_anomalies += 1
previous_activity_sl = self.alarm_levels['activity']
log_function = log.debug
self.alarm_levels['activity'] = 0
if self.number_of_anomalies == 1:
log_function = log.info
self.alarm_levels['activity'] = 1
elif self.number_of_anomalies > 1:
log_function = log.info
self.alarm_levels['activity'] = 2
# if there are some anomaly or we're nearby, record it.
if self.number_of_anomalies >= 1 or self.alarm_levels['activity'] >= 1:
State.tenant_state[tid].AnomaliesQ.append([datetime_now(), self.event_matrix, self.alarm_levels['activity']])
if previous_activity_sl != self.alarm_levels['activity']:
log_function("Alarm level changed from %d => %d" %
(previous_activity_sl,
self.alarm_levels['activity']))
if State.tenant_cache[1].notification.disable_admin_notification_emails:
return
if not (self.alarm_levels['activity'] or self.alarm_levels['disk_space']):
return
if not is_expired(self.last_alarm_email, minutes=120):
return
self.last_alarm_email = datetime_now()
alert = {
'alarm_levels': self.alarm_levels,
'measured_freespace': self.measured_freespace,
'measured_totalspace': self.measured_totalspace,
'event_matrix': self.event_matrix
}
yield generate_admin_alert_mail(tid, alert)
def check_disk_anomalies(self):
"""
Here in Alarm is written the threshold to say if we're in disk alarm
or not. Therefore the function "report" the amount of free space and
the evaluation + alarm shift is performed here.
workingdir: is performed a percentage check (at least 1% and an absolute comparison)
"unusable node" threshold: happen when the space is really shitty.
https://github.com/globaleaks/GlobaLeaks/issues/297
https://github.com/globaleaks/GlobaLeaks/issues/872
"""
self.measured_freespace, self.measured_totalspace = get_disk_space(self.state.settings.working_path)
disk_space = 0
disk_message = ""
accept_submissions = True
old_accept_submissions = State.accept_submissions
for c in get_disk_anomaly_conditions(self.measured_freespace, self.measured_totalspace):
if c['condition']:
disk_space = c['alarm_level']
info_msg = c['info_msg']()
if disk_space == 2:
disk_message = "[FATAL] Disk anomaly, submissions disabled: %s" % info_msg
else: # == 1
disk_message = "[WARNING]: Disk anomaly: %s" % info_msg
accept_submissions = c['accept_submissions']
break
# This check is temporarily, want to be verified that the switch can be
# logged as part of the Anomalies via this function
old_alarm_level = self.alarm_levels['disk_space']
if old_alarm_level != disk_space:
if disk_message:
log.err(disk_message)
else:
log.err("Available disk space returned to normal levels")
# the value is set here with a single assignment in order to
# minimize possible race conditions resetting/settings the values
self.alarm_levels['disk_space'] = disk_space
self.alarm_levels['disk_message'] = disk_message
# if not on testing change accept_submission to the new value
State.accept_submissions = accept_submissions if not self.state.settings.testing else True
if old_accept_submissions != State.accept_submissions:
log.info("Switching disk space availability from: %s to %s",
old_accept_submissions, accept_submissions)
# Must invalidate the cache here becuase accept_subs served in /public has changed
Cache.invalidate()
@inlineCallbacks
def check_anomalies():
State.tenant_state[1].Alarm.check_disk_anomalies()
for tid in State.tenant_state:
yield State.tenant_state[tid].Alarm.check_tenant_anomalies(tid)
yield save_anomalies()