-
Notifications
You must be signed in to change notification settings - Fork 52
Feature/login #195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/login #195
Changes from 23 commits
43b3f1b
d211377
2f0501a
a5a1a04
5d2c52e
eafbe37
56d0ca3
00d9e7d
c9c777e
bb2b52d
a32c9c4
deee638
7ca40e1
1bdbc54
eed52e2
b793e2c
46820e1
68e40fe
b7e7e60
e3c620a
b6b7441
d1849bb
8d7f135
84db349
d9ae477
d7465b5
9d735b1
4f5a127
4392974
b9ee9d3
94eb29a
97d8aff
8b54f67
b46d679
73f53c1
c15a451
c3cad9d
befd5f9
063489f
da12351
cc4fdae
0ae794a
7a47985
e7bc55a
8fbe36a
f203c38
bbb241b
b00439d
84a6fa7
5f3e3e1
5d47833
2488679
7d5395c
8f59086
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| DATABASE_CONNECTION_STRING = "sqlite:///calendar.db" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -137,6 +137,9 @@ dmypy.json | |
| # Pyre type checker | ||
| .pyre/ | ||
|
|
||
|
|
||
| # register stuff | ||
| run.txt | ||
| # VScode | ||
| .vscode/ | ||
| app/.vscode/ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| from typing import Optional, Union | ||
| from pydantic import BaseModel, validator, EmailStr, EmailError | ||
|
|
||
|
|
||
| EMPTY_FIELD_STRING = 'field is required' | ||
| MIN_FIELD_LENGTH = 3 | ||
| MAX_FIELD_LENGTH = 20 | ||
|
|
||
|
|
||
| def fields_not_empty(field: Optional[str]) -> Union[ValueError, str]: | ||
| '''Global function to validate fields are not empty.''' | ||
| if not field: | ||
| raise ValueError(EMPTY_FIELD_STRING) | ||
| return field | ||
|
|
||
|
|
||
| class UserBase(BaseModel): | ||
| ''' | ||
| Validating fields types | ||
| Returns a User object without sensitive information | ||
| ''' | ||
| username: str | ||
| email: str | ||
| full_name: str | ||
| description: Optional[str] = None | ||
|
|
||
| class Config: | ||
| orm_mode = True | ||
|
|
||
|
|
||
| class UserCreate(UserBase): | ||
| '''Validating fields types''' | ||
| password: str | ||
| confirm_password: str | ||
|
|
||
| '''Calling to field_not_empty validaion function for each required field.''' | ||
| _fields_not_empty_username = validator( | ||
| 'username', allow_reuse=True)(fields_not_empty) | ||
| _fields_not_empty_full_name = validator( | ||
| 'full_name', allow_reuse=True)(fields_not_empty) | ||
| _fields_not_empty_password = validator( | ||
| 'password', allow_reuse=True)(fields_not_empty) | ||
| _fields_not_empty_confirm_password = validator( | ||
| 'confirm_password', allow_reuse=True)(fields_not_empty) | ||
| _fields_not_empty_email = validator( | ||
| 'email', allow_reuse=True)(fields_not_empty) | ||
|
|
||
| @validator('confirm_password') | ||
| def passwords_match( | ||
| cls, confirm_password: str, | ||
| values: UserBase) -> Union[ValueError, str]: | ||
| '''Validating passwords fields identical.''' | ||
| if 'password' in values and confirm_password != values['password']: | ||
| raise ValueError("doesn't match to password") | ||
| return confirm_password | ||
|
|
||
| @validator('username') | ||
| def username_length(cls, username: str) -> Union[ValueError, str]: | ||
| '''Validating username length is legal''' | ||
| if not (MIN_FIELD_LENGTH < len(username) < MAX_FIELD_LENGTH): | ||
| raise ValueError("must contain between 3 to 20 charactars") | ||
| return username | ||
|
|
||
| @validator('password') | ||
| def password_length(cls, password: str) -> Union[ValueError, str]: | ||
| '''Validating username length is legal''' | ||
| if not (MIN_FIELD_LENGTH < len(password) < MAX_FIELD_LENGTH): | ||
| raise ValueError("must contain between 3 to 20 charactars") | ||
| return password | ||
|
|
||
| @validator('email') | ||
| def confirm_mail(cls, email: str) -> Union[ValueError, str]: | ||
| '''Validating email is valid mail address.''' | ||
| try: | ||
| EmailStr.validate(email) | ||
| return email | ||
| except EmailError: | ||
| raise ValueError("address is not valid") | ||
|
|
||
|
|
||
| class User(UserBase): | ||
| ''' | ||
| Validating fields types | ||
| Returns a User object without sensitive information | ||
| ''' | ||
| id: int | ||
| is_active: bool |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| from typing import Union | ||
|
||
| from starlette.requests import Request | ||
| from fastapi import Depends | ||
| from starlette.status import HTTP_401_UNAUTHORIZED | ||
| from app.internal.security.ouath2 import get_cookie, check_jwt_token, User | ||
|
|
||
|
|
||
| async def current_user_required( | ||
yammesicka marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| request: Request, jwt: str = Depends(get_cookie)) -> Union[User, bool]: | ||
| '''A dependency function protecting routes for only logged in user''' | ||
| user = await check_jwt_token(jwt, path=request.url.path) | ||
| if user: | ||
| return user | ||
|
|
||
|
|
||
| async def current_user(request: Request) -> Union[User, bool]: | ||
| ''' | ||
| A dependency function. | ||
| Returns logged in User object if exists. | ||
| None if not. | ||
| ''' | ||
| if 'Authorization' in request.cookies: | ||
| jwt = request.cookies['Authorization'] | ||
| else: | ||
| return None | ||
| user = await check_jwt_token(jwt, logged_in=True) | ||
| return user | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| from typing import Union | ||
| from passlib.context import CryptContext | ||
| from .schema import LoginUser | ||
| from app.database.models import User | ||
| from fastapi import APIRouter, Depends, HTTPException | ||
| from app.dependencies import templates | ||
| from app.database.database import SessionLocal | ||
| from fastapi.security import OAuth2PasswordBearer | ||
| from datetime import datetime, timedelta | ||
| import jwt | ||
| from starlette.status import HTTP_401_UNAUTHORIZED | ||
| from app.config import JWT_ALGORITHM, JWT_SECRET_KEY | ||
| from starlette.requests import Request | ||
|
|
||
| from starlette.responses import RedirectResponse | ||
|
|
||
|
|
||
| JWT_MIN_EXP = 3 | ||
kobyfogel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| pwd_context = CryptContext(schemes=["bcrypt"]) | ||
| oauth_schema = OAuth2PasswordBearer(tokenUrl="/login") | ||
|
|
||
|
|
||
| async def get_db_user_by_username(username: str) -> User: | ||
| '''Checking database for user by username field''' | ||
| session = SessionLocal() | ||
kobyfogel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return session.query(User).filter_by(username = username).first() | ||
|
|
||
|
|
||
| def get_hashed_password(password) -> str: | ||
yammesicka marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| '''Hashing user password''' | ||
| return pwd_context.hash(password) | ||
|
|
||
|
|
||
| def verify_password(plain_password, hashed_password) -> bool: | ||
|
||
| '''Verifying password and hashed password are equal''' | ||
| try: | ||
| return pwd_context.verify(plain_password, hashed_password) | ||
| except Exception as e: | ||
| return False | ||
|
|
||
|
|
||
| async def authenticate_user(user: LoginUser) -> Union[LoginUser, bool]: | ||
| '''Verifying user is in database and password is correct''' | ||
| db_user = await get_db_user_by_username(username = user.username) | ||
| if db_user: | ||
yammesicka marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if verify_password(user.hashed_password, db_user.password): | ||
| return LoginUser(username=user.username, hashed_password=db_user.password) | ||
| return False | ||
|
|
||
|
|
||
| def create_jwt_token(user: LoginUser) -> str: | ||
| '''Creating jwt-token out of user unique data''' | ||
| expiration = datetime.utcnow() + timedelta(minutes=JWT_MIN_EXP ) | ||
| jwt_payload = {"sub": user.username, "hashed_password": user.hashed_password, "exp": expiration} | ||
| jwt_token = jwt.encode(jwt_payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM) | ||
| return jwt_token | ||
|
|
||
|
|
||
| async def check_jwt_token( | ||
| token: str = Depends(oauth_schema), | ||
| logged_in: bool = False, path: bool = None) -> Union[User, bool]: | ||
| ''' | ||
| Check whether JWT token is correct. Returns User object if yes. | ||
| Returns None or raises HTTPException, | ||
| depanding which depandency activated this function. | ||
| ''' | ||
| try: | ||
| jwt_payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=JWT_ALGORITHM) | ||
| jwt_username = jwt_payload.get("sub") | ||
| jwt_hashed_password = jwt_payload.get("hashed_password") | ||
| jwt_expiration = jwt_payload.get("exp") | ||
| db_user = await get_db_user_by_username(username=jwt_username) | ||
|
||
| if db_user and db_user.password == jwt_hashed_password: | ||
| return db_user | ||
| else: | ||
| raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, headers=path, detail="Your token is incorrect. Please log in again") | ||
| except Exception as e: | ||
| if logged_in: | ||
| return None | ||
| if type(e).__name__ == 'ExpiredSignatureError': | ||
kobyfogel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, headers=path, detail="Your token has expired. Please log in again") | ||
| if type(e).__name__ == 'DecodeError': | ||
| raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, headers=path, detail="Your token is incorrect. Please log in again") | ||
|
|
||
|
|
||
| async def get_cookie(request: Request) -> str: | ||
|
||
| ''' | ||
| Extracts jwt from HTTPONLY cookie, if exists. | ||
| Raises HTTPException if not. | ||
| ''' | ||
| if 'Authorization' in request.cookies: | ||
| return request.cookies['Authorization'] | ||
| raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, headers=request.url.path, detail="Please log in to enter this page") | ||
|
|
||
|
|
||
| async def my_exception_handler( | ||
yammesicka marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| request: Request, exc: HTTP_401_UNAUTHORIZED) -> RedirectResponse: | ||
| ''' | ||
| Whenever HTTP_401_UNAUTHORIZED is raised, | ||
| redirecting to login route, with original requested url, | ||
| and details for why original request failed. | ||
| ''' | ||
| paramas = f"?next={exc.headers}&message={exc.detail}" | ||
| url = f"/login{paramas}" | ||
yammesicka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| response = RedirectResponse(url=url) | ||
| response.delete_cookie('Authorization') | ||
| return response | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| from typing import List, Optional, Union | ||
| from fastapi import Depends, Form, Query | ||
| from pydantic import BaseModel, validator | ||
|
|
||
|
|
||
|
|
||
| class LoginUser(BaseModel): | ||
| ''' | ||
| Validating fields types | ||
| Returns a User object for signing in. | ||
| ''' | ||
| username: str | ||
| hashed_password: str | ||
|
|
||
| class Config: | ||
| orm_mode = True |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| from app.database import models, schemas | ||
| from passlib.context import CryptContext | ||
| from sqlalchemy.orm import Session | ||
| from app.internal.security.ouath2 import pwd_context | ||
|
|
||
| # pwd_context = CryptContext(schemes=["bcrypt"]) | ||
kobyfogel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def get_by_id(db: Session, user_id: int) -> models.User: | ||
kobyfogel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| '''query database for a user by unique id''' | ||
| return db.query(models.User).filter(models.User.id == user_id).first() | ||
|
|
||
|
|
||
| def get_by_username(db: Session, username: str) -> models.User: | ||
|
||
| '''query database for a user by unique username''' | ||
| return db.query(models.User).filter( | ||
| models.User.username == username).first() | ||
|
|
||
|
|
||
| def get_by_mail(db: Session, email: str) -> models.User: | ||
| '''query database for a user by unique email''' | ||
| return db.query(models.User).filter(models.User.email == email).first() | ||
|
|
||
|
|
||
| def create(db: Session, user: schemas.UserCreate) -> models.User: | ||
| ''' | ||
| creating a new User object in the database, with hashed password | ||
| ''' | ||
| unhashed_password = user.password.encode('utf-8') | ||
| hashed_password = pwd_context.hash(unhashed_password) | ||
| user_details = { | ||
| 'username': user.username, | ||
| 'full_name': user.full_name, | ||
| 'email': user.email, | ||
| 'password': hashed_password, | ||
| 'description': user.description | ||
| } | ||
| db_user = models.User(**user_details) | ||
| db.add(db_user) | ||
| db.commit() | ||
| db.refresh(db_user) | ||
| return db_user | ||
|
|
||
|
|
||
| def delete_by_mail(db: Session, email: str) -> None: | ||
| '''deletes a user from database by unique email''' | ||
kobyfogel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| db_user = get_by_mail(db=db, email=email) | ||
| db.delete(db_user) | ||
| db.commit() | ||
Uh oh!
There was an error while loading. Please reload this page.