-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdrive_processor.py
More file actions
157 lines (140 loc) · 6.33 KB
/
drive_processor.py
File metadata and controls
157 lines (140 loc) · 6.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import json
from datetime import datetime
from alert_utils import send_email_alert
from db_helpers import insert_phishing_alert, insert_drive_event
def _extract_param_value(param):
if not isinstance(param, dict):
return None
for key in ('value', 'stringValue', 'intValue', 'boolValue'):
if param.get(key) is not None:
return param[key]
if param.get('multiValue'):
return param['multiValue']
return None
def process_drive_event(item, CONFIG):
"""
Analyze Google Drive audit logs for potential phishing or impersonation attempts.
"""
DOMAIN = CONFIG['domain']
params = {}
for param in item.get('parameters', []):
name = param.get('name')
if name:
params[name] = _extract_param_value(param)
actor = item.get('actor', {}).get('email', 'unknown')
timestamp = datetime.strptime(item['id']['time'], '%Y-%m-%dT%H:%M:%S.%fZ')
event_name = item.get('name', '')
visibility = (
params.get('new_value')
or params.get('visibility_change')
or params.get('visibility')
or ''
)
visibility_change = params.get('visibility_change', '')
owner_domain = params.get('primary_owner') or params.get('owner_domain') or ''
owner_display_name = params.get('owner_display_name') or params.get('owner', '')
doc_id = params.get('doc_id', '')
title = params.get('doc_title', 'Untitled Document')
file_link = f"https://drive.google.com/open?id={doc_id}" if doc_id else "N/A"
is_phishing_risk = False
reasons = []
# Rule 1: Document shared with "anyone with the link" (especially from external)
# Check various forms of public sharing
public_sharing_indicators = CONFIG.get('phishing', {}).get('public_sharing_indicators', [
'anyoneWithLink', 'anyone_with_link', 'anyone', 'public',
'anyoneWithTheLink', 'anyone_with_the_link'
])
is_public_share = any(indicator.lower() in str(visibility).lower() for indicator in public_sharing_indicators)
is_external_owner = DOMAIN not in str(owner_domain).lower() if owner_domain else True
if is_public_share:
if is_external_owner:
is_phishing_risk = True
reasons.append(f"External user shared document with 'anyone with the link' visibility: {visibility}")
else:
# Even internal users sharing publicly could be suspicious
reasons.append(f"Document shared with 'anyone with the link' visibility: {visibility}")
# Rule 2: Impersonation attempt - especially superintendent or principal
display_lower = owner_display_name.lower() if owner_display_name else ''
# Check for impersonation keywords, especially superintendent and principal
impersonation_keywords = CONFIG.get('phishing', {}).get('impersonation_keywords', [
'superintendent', 'principal', 'superintendant', 'prinicipal'
])
leadership_keywords = CONFIG.get('phishing', {}).get('leadership_keywords', [
'finance', 'hr', 'human resources', 'chief', 'director', 'executive'
])
is_impersonation = False
if display_lower:
# High priority: superintendent or principal
if any(k in display_lower for k in impersonation_keywords):
is_impersonation = True
if is_external_owner:
is_phishing_risk = True
reasons.append(f"HIGH PRIORITY: External user impersonating leadership role: {owner_display_name}")
else:
# Even internal, flag if combined with public sharing
if is_public_share:
is_phishing_risk = True
reasons.append(f"Potential impersonation attempt (internal user): {owner_display_name}")
# Medium priority: other leadership roles
elif any(k in display_lower for k in leadership_keywords):
if is_external_owner and is_public_share:
is_phishing_risk = True
reasons.append(f"External user with leadership-sounding name: {owner_display_name}")
# Rule 3: Suspicious file extension
title_lower = title.lower() if title else ''
suspicious_extensions = CONFIG.get('phishing', {}).get('suspicious_extensions', [
'.exe', '.scr', '.bat', '.zip', '.js', '.vbs', '.cmd'
])
if any(ext in title_lower for ext in suspicious_extensions):
if is_external_owner or is_public_share:
is_phishing_risk = True
reasons.append(f"Suspicious file extension: {title}")
# Rule 4: Combined risk factors - public share + impersonation
if is_public_share and is_impersonation and is_external_owner:
is_phishing_risk = True
reasons.append("CRITICAL: Public sharing combined with impersonation attempt")
if CONFIG.get('phishing', {}).get('log_all_drive_events'):
insert_drive_event(
actor if actor != 'unknown' else None,
owner_domain,
owner_display_name,
doc_id,
title,
visibility,
event_name,
item
)
if is_phishing_risk:
reason_text = '; '.join(reasons)
subject = f"{CONFIG['alerts']['alert_subject_prefix']} PHISHING ALERT: Suspicious Drive Share to {actor}"
msg = (
f"A potential phishing or impersonation share was detected.\n\n"
f"Reason: {reason_text}\n\n"
f"User: {actor}\n"
f"File: {title}\n"
f"Owner Domain: {owner_domain}\n"
f"Owner Display Name: {owner_display_name}\n"
f"Link: {file_link}\n"
f"Visibility: {visibility}\n"
f"Time: {timestamp}\n"
f"Event Type: {event_name}\n"
)
if CONFIG.get('log_level', '').upper() == 'DEBUG':
print(f"[DEBUG] Phishing alert triggered: {reason_text}")
print(json.dumps(item, indent=2))
send_email_alert(subject, msg, CONFIG)
insert_phishing_alert(
actor,
owner_domain,
owner_display_name,
doc_id,
title,
file_link,
visibility,
visibility_change,
reason_text,
item,
True
)
elif CONFIG.get('phishing', {}).get('log_all_drive_events') and CONFIG.get('log_level', '').upper() == 'DEBUG':
print("[DEBUG] Drive event stored for review (no phishing alert triggered).")