-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth_routes.py
106 lines (78 loc) · 2.93 KB
/
auth_routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from fastapi import APIRouter,status,Depends
from database import Session,engine
from schemas import SignUpModel,LoginModel
from models import User
from fastapi.exceptions import HTTPException
from werkzeug.security import generate_password_hash,check_password_hash
from fastapi_jwt_auth import AuthJWT
from fastapi.encoders import jsonable_encoder
auth_router=APIRouter(
prefix='/auth',
tags=['auth']
)
session=Session(bind=engine)
@auth_router.get('/')
async def hello(Authorize:AuthJWT=Depends()):
"""
## A sample hello world route
This returns Hello world
"""
try:
Authorize.jwt_required()
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Token"
)
return {"message":"Hello World"}
@auth_router.post('/signup',
status_code=status.HTTP_201_CREATED
)
async def signUp(user:SignUpModel):
db_email=session.query(User).filter(User.email==user.email).first()
if db_email is not None:
return HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail="User with the email already exists"
)
db_username=session.query(User).filter(User.username==user.email).first()
if db_username is not None:
return HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail="User with the username already exists"
)
new_user=User(
username=user.username,
email=user.email,
password=generate_password_hash(user.password),
is_active=user.is_active,
is_staff=user.is_staff
)
session.add(new_user)
session.commit()
return jsonable_encoder(new_user)
#login route
@auth_router.post('/login',status_code=200)
async def login(user:LoginModel,Authorize:AuthJWT=Depends()):
db_user=session.query(User).filter(User.username==user.username).first()
if db_user and check_password_hash(db_user.password, user.password):
access_token=Authorize.create_access_token(subject=db_user.username)
refresh_token=Authorize.create_refresh_token(subject=db_user.username)
response={
"access":access_token,
"refresh":refresh_token
}
return jsonable_encoder(response)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid Username or Password"
)
#refreshing tokens
@auth_router.get('/refresh')
async def refresh_token(Authorize:AuthJWT=Depends()):
try:
Authorize.jwt_refresh_token_required()
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Please provide a valid refresh token"
)
current_user=Authorize._get_jwt_subject()
access_token=Authorize.create_access_token(subject=current_user)
return jsonable_encoder({"access":access_token})
#deleting account