-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
instance_database.py
165 lines (141 loc) · 5.52 KB
/
instance_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# -*- coding: UTF-8 -*-
"""
@author: hhyo
@license: Apache Licence
@file: instance_database.py
@time: 2019/09/19
"""
import MySQLdb
import simplejson as json
from django.contrib.auth.decorators import permission_required
from django.http import JsonResponse, HttpResponse
from django_redis import get_redis_connection
from common.utils.extend_json_encoder import ExtendJSONEncoder
from sql.engines import get_engine, ResultSet
from sql.models import Instance, InstanceDatabase, Users
from sql.utils.resource_group import user_instances
__author__ = "hhyo"
@permission_required("sql.menu_database", raise_exception=True)
def databases(request):
"""获取实例数据库列表"""
instance_id = request.POST.get("instance_id")
saved = True if request.POST.get("saved") == "true" else False # 平台是否保存
if not instance_id:
return JsonResponse({"status": 0, "msg": "", "data": []})
try:
instance = user_instances(request.user, db_type=["mysql", "mongo"]).get(
id=instance_id
)
except Instance.DoesNotExist:
return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})
# 获取已录入数据库
cnf_dbs = dict()
for db in InstanceDatabase.objects.filter(instance=instance).values(
"id", "db_name", "owner", "owner_display", "remark"
):
db["saved"] = True
cnf_dbs[f"{db['db_name']}"] = db
query_engine = get_engine(instance=instance)
query_result = query_engine.get_all_databases_summary()
if not query_result.error:
# 获取数据库关联用户信息
rows = []
for row in query_result.rows:
if row["db_name"] in cnf_dbs.keys():
row = dict(row, **cnf_dbs[row["db_name"]])
rows.append(row)
if saved:
rows = [row for row in rows if row["saved"]]
result = {"status": 0, "msg": "ok", "rows": rows}
else:
result = {"status": 1, "msg": query_result.error}
# 关闭连接
query_engine.close()
return HttpResponse(
json.dumps(result, cls=ExtendJSONEncoder, bigint_as_string=True),
content_type="application/json",
)
@permission_required("sql.menu_database", raise_exception=True)
def create(request):
"""创建数据库"""
instance_id = request.POST.get("instance_id", 0)
db_name = request.POST.get("db_name")
owner = request.POST.get("owner", "")
remark = request.POST.get("remark", "")
if not all([db_name]):
return JsonResponse(
{"status": 1, "msg": "参数不完整,请确认后提交", "data": []}
)
try:
instance = user_instances(request.user, db_type=["mysql", "mongo"]).get(
id=instance_id
)
except Instance.DoesNotExist:
return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})
try:
owner_display = Users.objects.get(username=owner).display
except Users.DoesNotExist:
return JsonResponse({"status": 1, "msg": "负责人不存在", "data": []})
engine = get_engine(instance=instance)
if instance.db_type == "mysql":
# escape
db_name = engine.escape_string(db_name)
exec_result = engine.execute(
db_name="information_schema", sql=f"create database {db_name};"
)
elif instance.db_type == "mongo":
exec_result = ResultSet()
try:
conn = engine.get_connection()
db = conn[db_name]
db.create_collection(
name=f"archery-{db_name}"
) # mongo创建数据库,需要数据库存在数据才会显示数据库名称,这里创建一个archery-{db_name}的集合
except Exception as e:
exec_result.error = f"创建数据库失败, 错误信息:{str(e)}"
# 关闭连接
engine.close()
if exec_result.error:
return JsonResponse({"status": 1, "msg": exec_result.error})
# 保存到数据库
else:
InstanceDatabase.objects.create(
instance=instance,
db_name=db_name,
owner=owner,
owner_display=owner_display,
remark=remark,
)
# 清空实例资源缓存
r = get_redis_connection("default")
for key in r.scan_iter(match="*insRes*", count=2000):
r.delete(key)
return JsonResponse({"status": 0, "msg": "", "data": []})
@permission_required("sql.menu_database", raise_exception=True)
def edit(request):
"""编辑/录入数据库"""
instance_id = request.POST.get("instance_id", 0)
db_name = request.POST.get("db_name")
owner = request.POST.get("owner", "")
remark = request.POST.get("remark", "")
if not all([db_name]):
return JsonResponse(
{"status": 1, "msg": "参数不完整,请确认后提交", "data": []}
)
try:
instance = user_instances(request.user, db_type=["mysql", "mongo"]).get(
id=instance_id
)
except Instance.DoesNotExist:
return JsonResponse({"status": 1, "msg": "你所在组未关联该实例", "data": []})
try:
owner_display = Users.objects.get(username=owner).display
except Users.DoesNotExist:
return JsonResponse({"status": 1, "msg": "负责人不存在", "data": []})
# 更新或者录入信息
InstanceDatabase.objects.update_or_create(
instance=instance,
db_name=db_name,
defaults={"owner": owner, "owner_display": owner_display, "remark": remark},
)
return JsonResponse({"status": 0, "msg": "", "data": []})