-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtiktok.py
49 lines (42 loc) · 1.67 KB
/
tiktok.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from typing import Dict
import aiohttp
from tiktokapipy.async_api import AsyncTikTokAPI
from tiktokapipy.models.user import user_link
class Tiktok:
def __init__(self):
self.user_exists: Dict[str, bool] = {}
async def get_recent_video_id(self, tiktok_username: str) -> dict:
"""
Get the most recent TikTok video of a user.
:param tiktok_username: str
The TikTok username to check videos for.
:returns: int
Latest video ID.
"""
tiktok_username = tiktok_username.lower().replace("@", "")
try:
async with AsyncTikTokAPI() as api:
if not await self.get_user_exists(tiktok_username):
raise UserWarning
user = await api.user(tiktok_username)
try:
async for video in user.videos:
return {tiktok_username: video.id}
except StopAsyncIteration: # no videos.
return {tiktok_username: None}
except Exception as e:
print(f"TikTok Search Failed -> {e}")
return {tiktok_username: None}
async def get_user_exists(self, tiktok_username: str) -> bool:
"""
Get if a user exists.
:param tiktok_username: str
The TikTok username to check exists
:returns: bool
Whether the username exists.
"""
link = user_link(tiktok_username)
if tiktok_username not in self.user_exists:
async with aiohttp.ClientSession().get(url=link) as r:
self.user_exists[tiktok_username] = r.status == 200
return self.user_exists[tiktok_username]