Skip to content

Commit c4e25e4

Browse files
committed
Create access control model
1 parent eb092f4 commit c4e25e4

File tree

1 file changed

+89
-2
lines changed

1 file changed

+89
-2
lines changed
Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,89 @@
1-
class AccessControl:
2-
pass
1+
from typing import List
2+
3+
from sqlalchemy.exc import SQLAlchemyError
4+
5+
from ssh_manager_backend.db import AccessControl
6+
from ssh_manager_backend.db.database import db_session
7+
8+
9+
class AccessControlModel:
10+
def __init__(self):
11+
self.session = db_session()
12+
13+
def has_access(self, username: str, ip_address: str) -> bool:
14+
"""
15+
Checks whether a user has access to the provided the list of ip addresses.
16+
17+
:param username:
18+
:param ip_address:
19+
:return: boolean value stating whether user has access or not.
20+
"""
21+
22+
try:
23+
acl_details: AccessControl = self.session.query(AccessControl).filter(
24+
AccessControl.username == username
25+
).first()
26+
return ip_address in acl_details.ip_addresses
27+
except [AttributeError, SQLAlchemyError]:
28+
return False
29+
30+
def grant_access(self, username: str, ip_addresses: List[str]) -> bool:
31+
"""
32+
Updates user access.
33+
34+
:param username:
35+
:param ip_addresses:
36+
:return: booleans value for success/failure.
37+
"""
38+
39+
try:
40+
acl_details: AccessControl = self.session.query(AccessControl).filter(
41+
AccessControl.username == username
42+
).first()
43+
44+
acl_details.ip_addresses += ip_addresses
45+
acl_details.ip_addresses = list(set(acl_details.ip_addresses))
46+
self.session.commit()
47+
except [AttributeError, SQLAlchemyError]:
48+
return False
49+
50+
return True
51+
52+
def remove_access(self, username: str, ip_addresses: List[str]) -> bool:
53+
"""
54+
Updates user access.
55+
56+
:param username:
57+
:param ip_addresses:
58+
:return: booleans value for success/failure.
59+
"""
60+
61+
try:
62+
acl_details: AccessControl = self.session.query(AccessControl).filter(
63+
AccessControl.username == username
64+
).first()
65+
66+
for ip in ip_addresses:
67+
acl_details.ip_addresses.remove(ip)
68+
69+
self.session.commit()
70+
except [AttributeError, SQLAlchemyError]:
71+
return False
72+
73+
return True
74+
75+
def get_all_ips(self, username: str) -> List[str]:
76+
"""
77+
Gets list of all Ip addresses for the given user.
78+
79+
:param username:
80+
:return: list of ip addresses.
81+
"""
82+
83+
try:
84+
acl_details: AccessControl = self.session.query(AccessControl).filter(
85+
AccessControl.username == username
86+
).first()
87+
return acl_details.ip_addresses
88+
except [AttributeError, SQLAlchemyError]:
89+
return []

0 commit comments

Comments
 (0)