This repository has been archived by the owner on Dec 14, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuser.py
396 lines (355 loc) · 14.8 KB
/
user.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
# 用户相关
import json
import random
import string
import time as time
from sanic.response import redirect
from sanic.response import text
import db.mysql as db
import md5 as md5
# try:
import operationconfig as configs
from mail import mail as mails
# except:
# # import md5 as md5
# import db.mysql as db
# from mail import mail as mails
# import config as configs
get_session = db.get_session
User = db.table.User
mail = mails()
config = configs.information()
def registered_record(id=time.time(), # 设置主键
name=time.time(), # 用户名
Key=md5.get_md5(''.join(random.sample(string.ascii_letters + string.digits, 20))), # 密码
Registration_time=time.time(), # 注册时间
postbox=False, # 邮箱
nickname='空的', # 昵称
HeadPortrait='' # 头像(url)
):
'''
添加注册用户记录
->
'''
if (bool(id) and
bool(postbox)
):
with get_session() as s:
user = User()
user.id = id
user.name = name
user.Key = 1
user.Registration_time = Registration_time
user.postbox = postbox
user.nickname = nickname
user.created_at = time.time()
# user.HeadPortrait = HeadPortrait
# user.LastLogin = 0
s.add(user)
s.commit()
return id
else:
return False
def login_Password_authentication(id=False, name=False, postbox=False, Key=False):
'''
登录密码验证,支持:
id登录:id
用户名登录:name
邮箱登录:postbox
密码:Key
'''
with get_session() as s:
if (id and s.query(User).filter(User.id == id, User.Key == Key).first() != []):
return True
elif (name and s.query(User).filter(User.name == name, User.Key == Key).first() != []):
return True
elif (postbox and s.query(User).filter(User.postbox == postbox, User.Key == Key).first() != []):
return True
else:
return False
def get_user_data(id=False, name=False, postbox=False):
'''
获得用户数据:
可以传入id,name,postbox
返回用户数据,格式可以参照user表
'''
with get_session() as s:
try:
id_data = s.query(User).filter(User.id == id, User.deleted_at == None).first()
except:
id_data = None
try:
name_data = s.query(User).filter(User.name == name, User.deleted_at == None).first()
except:
name_data = None
try:
postbox_data = s.query(User).filter(User.postbox == postbox, User.deleted_at == None).first()
except:
postbox_data = None
if (id and id_data != None):
data = id_data
elif (name and name_data != None):
data = name_data
elif (postbox and postbox_data != None):
data = postbox_data
else:
return None
ret = data.single_to_dict()
return ret # 将结果转换成dict
def Change_user_data(id=False, name=False, postbox=False, data: dict = {}):
'''
更改用户数据
data: 要更改的数据
'''
with get_session() as s:
for k, v in data.items():
if (not (k == 'id' or k == 'postbox' or k == 'Registration_time' or k == 'name')):
if (id):
s.query(User).filter(User.id == id).update({k: v})
elif (name):
s.query(User).filter(User.name == name).update({k: v})
elif (postbox):
s.query(User).filter(User.postbox == postbox).update({k: v})
else:
return False
return True
def Traverse_other_data_with_the_same_key_value(id: str = ''):
'''
遍历其他数据的同一键值
return :dict({<user-id>:<其他数据>,...})
示例:当id='CoreConfiguration'时返回{'1686193681': {'administrators': True}}
'''
rep = {}
with get_session() as s:
for i in s.query(User).all():
i = i.dobule_to_dict()
if (i == None):
continue
else:
try:
data_is_idis = eval(i['DATA'])[id]
except:
continue
rep[i['id']] = data_is_idis
return rep
def QQ_online_status(QQID):
import requests
req = requests.get(
'http://www.webxml.com.cn/webservices/qqOnlineWebService.asmx/qqCheckOnline?qqCode={QQID}'.format(QQID=QQID))
req = req.text
# .replace('<string xmlns="http://WebXml.com.cn/">','').replace('</string>','').replace('<?xml version="1.0" encoding="utf-8"?>','').replace('\n','')
if ('Y' in req):
return True
elif ('N' in req):
return False
elif ('E' in req):
return None
raise Exception('请求webservicesAPI错误')
class api():
'''
session['login_status_id'] :用户登录状态,登录的id
'''
config = configs.Email_login() # 获取邮件配置
def apiDict(self):
return {
'login_SendEmainVerification_API': self.login_SendEmainVerification_API, # 登录
'login_Email_Verifier_API': self.login_Email_Verifier_API, # 验证验证码
'registered': self.registered, # 注册
'get_user_data': self.get_user_data, # 获取用户信息
'GetorChange_other_user_data': self.GetorChange_other_user_data, # 修改或查看用户数据
'login_status': self.login_status, # 获取登录状态
'HeadSculpture': self.HeadSculpture, # 获取头像地址
'GetNickname': self.GetNickname, # 获得昵称
'Change_user_data': self.Change_user_data, # 更改用户数据
}
def Change_user_data(self, get_or_post, EnableSession, rep, **para): # 更改用户数据
s = EnableSession() # 开启Session
try:
user_id = s.data['login_status_id'] # 获得登录状态
except:
return para['RepisOldVersion'](s, 'Not logged in')
try:
data = json.loads(get_or_post('data'))
except:
return para['RepisOldVersion'](s, 'Require JSON')
if (Change_user_data(id=user_id, data=data)):
return para['RepisOldVersion'](s, 'OK')
else:
return para['RepisOldVersion'](s, 'internal error')
def login_status(self, get_or_post, s, rep, **para):
'''获取登录状态'''
s = s()
if (s.get('login_status_id')):
return para['RepisOldVersion'](s, json.dumps({'state': str(True), 'id': s.get('login_status_id')}))
else:
return para['RepisOldVersion'](s, 'None')
def GetorChange_other_user_data(self, get_or_post, EnableSession, rep, **para):
'''
得到其他的用户数据:
方便lci其他平台得到保存在lci-org-user的数据
需要id:key
验证key正确之后
获取对应id:data
例子:
> id:key
< user_data
'''
from operation.other_data import GET_other_user_data_interior , Change_other_user_data_interior
s = EnableSession()
try:
user_id = s.data['login_status_id'] # 获得登录状态
except:
return para['RepisOldVersion'](s, 'Not logged in')
# 获得参数
key = get_or_post('key', False)
id = get_or_post('id', False)
v = get_or_post('v', False)
if (not key):
return para['RepisOldVersion'](s, 'Incomplete parameters (key)')
elif (not id):
return para['RepisOldVersion'](s, 'Incomplete parameters (id)')
else:
try:
OUDGK = configs.OtherUserDataGetsKey[id]
except:
return para['RepisOldVersion'](s, 'Incorrect(id)') # 判断id是否存在
if (OUDGK == key):
if (v): # 是否有数据,如果有数据就更改数据,没有就查看数据
return para['RepisOldVersion'](s,
Change_other_user_data_interior(user_id=user_id, id=id, v=v)) # 更改数据
else:
return para['RepisOldVersion'](s, GET_other_user_data_interior(user_id=user_id, id=id)) # 获得数据
elif (OUDGK != key):
return para['RepisOldVersion'](s, 'Incorrect(key)') # key is not
else:
return para['RepisOldVersion'](s, 'Strange reasons lead to failure')
def registered(self, get_or_post, s, rep, **para):
'''注册'''
# 接收参数:
s = s()
name = get_or_post('name', time.time()) # 用户名
Key = get_or_post('Key'.format(), True) # 密码
postbox = get_or_post('postbox', False) # 邮箱
nickname = get_or_post('nickname', True) # 昵称
if (get_user_data(postbox=postbox) != None): # 获得用户数据,如果不是None证明已经存在用户
return {'async': False,
'data': text('User already exists'),
'cookie': {'Session_key': ''},
'session_odj': s
}
id = registered_record(name=name,
Key=Key,
postbox=postbox,
nickname=nickname
) # 注册,提交到数据库,返回注册之后的id
if (id):
return para['RepisOldVersion'](s, 'OK,ID->' + str(id))
else:
return para['RepisOldVersion'](s, 'Incomplete parameters')
def login_SendEmainVerification_API(self, get_or_post, s, rep, **para): # 发送验证码并且储存验证码的API
'''
登录验证
'''
s = s()
if (s.get('login_status_id')): return {'async': False, 'data': text('Is logged in'),
'cookie': {'Session_key': ''}, 'session_odj': s}
def login_Verification(self, id=False, name=False, postbox=False): # 获取验证码(登录)并且发送
data = get_user_data(id, name, postbox)
Verification = random.randint(1, 5000000) # 获取验证码
if (data != None):
try:
mail.send(self.config.TEMPlate
.format(Verification=Verification,
nickname=data['nickname'],
name=data['name'],
websiteName=config.name,
SessionKey=s.key
),
self.config.Theme
.format(Verification=random.randint(1, 5000000),
nickname=data['nickname'],
name=data['name'],
websiteName=config.name,
SessionKey=s.key
),
{'nickname': data['name'], 'address': data['postbox']})
except:
return False # 邮件发送失败
else:
return None # 不存在此用户
return [Verification, data['id']] # 成功,返回验证码
# 接收参数
user_name = get_or_post('user_name', get_or_post('name', False))
user_id = get_or_post('user_id', get_or_post('id', False))
user_postbox = get_or_post('user_postbox', get_or_post('postbox', False))
if (not user_name and not user_postbox and not user_id):
return para['RepisOldVersion'](s, 'Parameter is not complete')
Verification = login_Verification(self, user_id, user_name, user_postbox) # 获取并发送验证码
s.Set('Verification', Verification[0])
s.Set('user_id', Verification[1])
return para['RepisOldVersion'](s, 'OK,ID->' + str(Verification[1]))
def login_Email_Verifier_API(self, get_or_post, s, rep, **para):
'''
验证验证码是否正确,并登录
'''
s = s()
# HTTP参数:Verification(验证码),用于登录验证
GetVerification = get_or_post('Verification') # 得到的验证码
# return rep(s,str(s.get('Verification')))
if (str(s.get('Verification')) == str(GetVerification)):
Change_user_data(id=s.data['user_id'], data={'LastLogin': time.time()}) # 更改用户数据
s.Set('login_status_id', s.data['user_id']) # 设置登录状态
s.delete(['user_id', 'Verification']) # 删除待验证用户id,删除验证码
# print(s.data)
return {'async': False,
'data': text('OK,ID->' + str(s.data['login_status_id'])),
'cookie': {'Session_key': ''},
'session_odj': s
}
else:
return {'async': False,
'data': text('Verification code error'),
'cookie': {'Session_key': ''},
'session_odj': s
}
def get_user_data(self, get_or_post, s, rep, **para):
s = s()
try:
ret = get_user_data(id=s.data['login_status_id'])
del ret['DATA']
del ret['deleted_at']
del ret['updated_at']
return para['RepisOldVersion'](s, str(json.dumps(ret)))
except:
return para['RepisOldVersion'](s, 'You may not be logged in')
def HeadSculpture(self, get_or_post, s, rep, **para): # 获取头像地址
s = s()
id = get_or_post('id', False)
name = get_or_post('name', False)
postbox = get_or_post('postbox', False)
if (id or name or postbox):
pass
elif (s.get('login_status_id') != None):
id = s.get('login_status_id')
else:
return para['RepisOldVersion'](s, 'Parameter is not complete')
data = get_user_data(id, name, postbox)
hash = md5.get_md5(data['postbox'])
return {'async': False,
'data': redirect("https://cravatar.cn/avatar/{hash}".format(hash=hash)),
'cookie': {'Session_key': ''},
'session_odj': s
}
def GetNickname(self, get_or_post, s, rep, **para):
s = s()
id = get_or_post('id', False)
name = get_or_post('name', False)
postbox = get_or_post('postbox', False)
if (id or name or postbox):
pass
elif (s.get('login_status_id') != None):
id = s.get('login_status_id')
else:
return para['RepisOldVersion'](s, 'Parameter is not complete')
data = get_user_data(id, name, postbox)
return para['RepisOldVersion'](s, data['nickname'])