forked from XaltriX/VJ-File-Store
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathusers_api.py
92 lines (68 loc) · 2.62 KB
/
users_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# © Telegram : @KingVJ01 , GitHub : @VJBots
# Don't Remove Credit Tg - @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot https://youtube.com/@Tech_VJ
# Ask Doubt on telegram @KingVJ01
import requests
import json
from motor.motor_asyncio import AsyncIOMotorClient
from config import CLONE_DB_URI, DB_NAME
# Don't Remove Credit Tg - @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot https://youtube.com/@Tech_VJ
# Ask Doubt on telegram @KingVJ01
client = AsyncIOMotorClient(CLONE_DB_URI)
db = client[DB_NAME]
col = db["users"]
# Don't Remove Credit Tg - @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot https://youtube.com/@Tech_VJ
# Ask Doubt on telegram @KingVJ01
async def get_short_link(user, link):
api_key = user["shortener_api"]
base_site = user["base_site"]
print(user)
response = requests.get(f"https://{base_site}/api?api={api_key}&url={link}")
data = response.json()
if data["status"] == "success" or rget.status_code == 200:
return data["shortenedUrl"]
# Don't Remove Credit Tg - @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot https://youtube.com/@Tech_VJ
# Ask Doubt on telegram @KingVJ01
async def get_user(user_id):
user_id = int(user_id)
user = await col.find_one({"user_id": user_id})
if not user:
res = {
"user_id": user_id,
"shortener_api": None,
"base_site": None,
}
await col.insert_one(res)
user = await col.find_one({"user_id": user_id})
return user
# Don't Remove Credit Tg - @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot https://youtube.com/@Tech_VJ
# Ask Doubt on telegram @KingVJ01
async def update_user_info(user_id, value:dict):
user_id = int(user_id)
myquery = {"user_id": user_id}
newvalues = { "$set": value }
await col.update_one(myquery, newvalues)
# Don't Remove Credit Tg - @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot https://youtube.com/@Tech_VJ
# Ask Doubt on telegram @KingVJ01
async def total_users_count():
count = await col.count_documents({})
return count
# Don't Remove Credit Tg - @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot https://youtube.com/@Tech_VJ
# Ask Doubt on telegram @KingVJ01
async def get_all_users():
all_users = col.find({})
return all_users
# Don't Remove Credit Tg - @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot https://youtube.com/@Tech_VJ
# Ask Doubt on telegram @KingVJ01
async def delete_user(user_id):
await col.delete_one({'user_id': int(user_id)})
# Don't Remove Credit Tg - @VJ_Botz
# Subscribe YouTube Channel For Amazing Bot https://youtube.com/@Tech_VJ
# Ask Doubt on telegram @KingVJ01