-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
audit_log.py
145 lines (122 loc) · 4.15 KB
/
audit_log.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# -*- coding: UTF-8 -*-
import logging
import datetime
import simplejson as json
from django.dispatch import receiver
from django.utils import timezone
from django.http import HttpResponse
from django.db.models import Q
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.signals import (
user_logged_in,
user_logged_out,
user_login_failed,
)
from .models import AuditEntry, Users
from common.utils.permission import superuser_required
from common.utils.extend_json_encoder import ExtendJSONEncoder
log = logging.getLogger("default")
@login_required
def audit_input(request):
"""用户提交的操作信息"""
result = {}
action = request.POST.get("action")
extra_info = request.POST.get("extra_info", "")
result["user_id"] = request.user.id
result["user_name"] = request.user.username
result["user_display"] = request.user.display
result["action"] = action
result["extra_info"] = extra_info
audit = AuditEntry(**result)
audit.save()
return HttpResponse(
json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type="application/json",
)
@permission_required("sql.audit_user", raise_exception=True)
def audit_log(request):
"""获取审计日志列表"""
limit = int(request.POST.get("limit", 0))
offset = int(request.POST.get("offset", 0))
limit = offset + limit
limit = limit if limit else None
search = request.POST.get("search", "")
action = request.POST.get("action", "")
start_date = request.POST.get("start_date")
end_date = request.POST.get("end_date")
filter_dict = dict()
if start_date and end_date:
end_date = datetime.datetime.strptime(
end_date, "%Y-%m-%d"
) + datetime.timedelta(days=1)
filter_dict["action_time__range"] = (start_date, end_date)
if action:
filter_dict["action"] = action
audit_log_obj = AuditEntry.objects.filter(**filter_dict)
if search:
audit_log_obj = audit_log_obj.filter(
Q(user_name__icontains=search)
| Q(action__icontains=search)
| Q(extra_info__icontains=search)
)
audit_log_count = audit_log_obj.count()
audit_log_list = audit_log_obj.order_by("-action_time")[offset:limit].values(
"user_id", "user_name", "user_display", "action", "extra_info", "action_time"
)
# QuerySet 序列化
rows = [row for row in audit_log_list]
result = {"total": audit_log_count, "rows": rows}
# 返回查询结果
return HttpResponse(
json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type="application/json",
)
def get_client_ip(request):
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip
@receiver(user_logged_in)
def user_logged_in_callback(sender, request, user, **kwargs):
ip = get_client_ip(request)
now = timezone.now()
AuditEntry.objects.create(
action="登入",
extra_info=ip,
user_id=user.id,
user_name=user.username,
user_display=user.display,
action_time=now,
)
@receiver(user_logged_out)
def user_logged_out_callback(sender, request, user, **kwargs):
ip = get_client_ip(request)
now = timezone.now()
AuditEntry.objects.create(
action="登出",
extra_info=ip,
user_id=user.id,
user_name=user.username,
user_display=user.display,
action_time=now,
)
@receiver(user_login_failed)
def user_login_failed_callback(sender, credentials, **kwargs):
now = timezone.now()
user_name = credentials.get("username", None)
user_obj = Users.objects.filter(username=user_name)[0:1]
user_count = user_obj.count()
user_id = 0
user_display = ""
if user_count > 0:
user_id = user_obj[0].id
user_display = user_obj[0].display
AuditEntry.objects.create(
action="登入失败",
user_id=user_id,
user_name=user_name,
user_display=user_display,
action_time=now,
)