Skip to content

Commit 80b07e7

Browse files
committed
Bug fixes and added tests
1 parent bc682e8 commit 80b07e7

File tree

7 files changed

+214
-16
lines changed

7 files changed

+214
-16
lines changed

app.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,7 @@ def shutdown_session() -> None:
1313
"""
1414

1515
db_session.remove()
16+
17+
18+
if __name__ == "__main__":
19+
app.run(host="0.0.0.0", port=5000, debug=True, threaded=True)

ssh_manager_backend/app/models/access_control.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,15 @@ def grant_access(self, username: str, ip_addresses: List[str]) -> bool:
7777

7878
return True
7979

80-
def revoke_access(self, username: str, ip_addresses: List[str]) -> bool:
80+
def revoke_access(
81+
self, username: str, ip_addresses: List[str], revoke_all: bool = False
82+
) -> bool:
8183
"""
8284
Updates user access.
8385
8486
:param username:
8587
:param ip_addresses:
88+
:param revoke_all:.
8689
:return: booleans value for success/failure.
8790
"""
8891

@@ -91,15 +94,21 @@ def revoke_access(self, username: str, ip_addresses: List[str]) -> bool:
9194
AccessControl.username == username
9295
).first()
9396

94-
for ip in ip_addresses:
95-
try:
96-
acl_details.ip_addresses.remove(ip)
97-
except ValueError:
98-
continue
97+
if not revoke_all:
98+
for ip in ip_addresses:
99+
try:
100+
acl_details.ip_addresses.remove(ip)
101+
except ValueError:
102+
continue
103+
104+
self.session.query(AccessControl).filter(
105+
AccessControl.username == username
106+
).update({"ip_addresses": acl_details.ip_addresses})
107+
else:
108+
self.session.query(AccessControl).filter(
109+
AccessControl.username == username
110+
).update({"ip_addresses": []})
99111

100-
self.session.query(AccessControl).filter(
101-
AccessControl.username == username
102-
).update({"ip_addresses": acl_details.ip_addresses})
103112
self.session.commit()
104113
except AttributeError:
105114
return False

ssh_manager_backend/app/models/keys.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,30 @@ def exists(self, key_name: str) -> bool:
2121
return self.session.query(Key).filter(Key.name == key_name).first() is not None
2222

2323
def create(
24-
self, name: str, encrypted_key: bytes, key_hash: str, user: User
24+
self, name: str, encrypted_key: bytes, key_hash: str, user_id: int
2525
) -> bool:
2626
"""
2727
Creates a key in database.
2828
2929
:param name:
3030
:param encrypted_key:
3131
:param key_hash:
32-
:param user:
32+
:param user_id:
3333
:return: Boolean value indicating success/failure.
3434
"""
3535

3636
try:
3737
key: Key = Key(
38-
name=name, encrypted_key=encrypted_key, key_hash=key_hash, user=user
38+
name=name,
39+
encrypted_key=encrypted_key,
40+
key_hash=key_hash,
41+
user_id=user_id,
3942
)
4043

4144
self.session.add(key)
4245
self.session.commit()
4346
except SQLAlchemyError:
47+
self.session.rollback()
4448
return False
4549

4650
return True

ssh_manager_backend/app/models/keys_mapping.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,21 @@ def exists(self, ip_address: str) -> bool:
2525
is not None
2626
)
2727

28-
def create(self, ip_address: str, key_name: str, key: Key) -> bool:
28+
def create(self, ip_address: str, key_name: str) -> bool:
2929
"""
3030
Creates a key mapping in db.
3131
3232
:param ip_address:
3333
:param key_name:
34-
:param key: Key object
3534
:return: Boolean value indicating success/failure.
3635
"""
3736

3837
try:
39-
key_mapping = KeyMapping(key_name=key_name, ip_address=ip_address, key=key)
38+
key_mapping = KeyMapping(key_name=key_name, ip_address=ip_address)
4039
self.session.add(key_mapping)
4140
self.session.commit()
4241
except SQLAlchemyError:
42+
self.session.rollback()
4343
return False
4444

4545
return True

ssh_manager_backend/db/database.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from sqlalchemy.orm import scoped_session, sessionmaker
44

55
DB_URI = (
6-
"postgresql+psycopg2://postgres:vP28ObNJLhb5qFDe@35.222.241.198/ssh_manager_test"
6+
# "postgresql+psycopg2://postgres:vP28ObNJLhb5qFDe@35.222.241.198/ssh_manager_test"
7+
"postgresql+psycopg2://ssh_manager:pass@localhost/ssh_manager_dev"
78
)
89
engine = create_engine(DB_URI, echo=True)
910
db_session = scoped_session(
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import pytest
2+
3+
from ssh_manager_backend.app.models.keys import KeyModel
4+
from ssh_manager_backend.app.models.keys_mapping import KeyMappingModel
5+
from ssh_manager_backend.app.models.user import UserModel
6+
from tests.test_ssh_manager_backend import db_cleanup
7+
8+
9+
class TestKeyMappingModel:
10+
@pytest.fixture
11+
def cleanup(self):
12+
yield
13+
db_cleanup()
14+
15+
def test_create(self):
16+
user = UserModel()
17+
name: str = "test_user"
18+
username: str = "test_username"
19+
password: str = "test_password"
20+
admin: bool = False
21+
encrypted_dek: bytes = b"test_encrypted_dek"
22+
iv_for_dek: bytes = b"test_iv_for_dek"
23+
salt_for_dek: bytes = b"test_salt_for_dek"
24+
iv_for_kek: bytes = b"test_iv_for_kek"
25+
salt_for_kek: bytes = b"test_salt_for_kek"
26+
salt_for_password: bytes = b"test_salt_for_password"
27+
28+
key = KeyModel()
29+
key_name: str = "test_key"
30+
encrypted_key: bytes = b"encrypted_test_key"
31+
key_hash: str = "test_key_hash"
32+
33+
assert (
34+
key.create(
35+
name=key_name,
36+
encrypted_key=encrypted_key,
37+
key_hash=key_hash,
38+
user_id=12345,
39+
)
40+
is False
41+
)
42+
43+
assert (
44+
user.create(
45+
name=name,
46+
username=username,
47+
password=password,
48+
admin=admin,
49+
encrypted_dek=encrypted_dek,
50+
iv_for_dek=iv_for_dek,
51+
salt_for_dek=salt_for_dek,
52+
iv_for_kek=iv_for_kek,
53+
salt_for_kek=salt_for_kek,
54+
salt_for_password=salt_for_password,
55+
)
56+
is True
57+
)
58+
59+
user_id: int = user.get_user(username=username).id
60+
61+
assert (
62+
key.create(
63+
name=key_name,
64+
encrypted_key=encrypted_key,
65+
key_hash=key_hash,
66+
user_id=user_id,
67+
)
68+
is True
69+
)
70+
71+
key_mapping: KeyMappingModel = KeyMappingModel()
72+
ip_address: str = "1.1.1.1"
73+
74+
assert key_mapping.create(ip_address=ip_address, key_name=key_name) is True
75+
76+
assert (
77+
key_mapping.create(ip_address=ip_address, key_name="non_existent_key")
78+
is False
79+
)
80+
81+
def test_exists(self):
82+
key_mapping: KeyMappingModel = KeyMappingModel()
83+
ip_address: str = "1.1.1.1"
84+
85+
assert key_mapping.exists(ip_address=ip_address) is True
86+
87+
assert key_mapping.exists(ip_address="2.1.1.1") is False
88+
89+
def test_get_mapping(self, cleanup):
90+
key_mapping: KeyMappingModel = KeyMappingModel()
91+
key_name: str = "test_key"
92+
ip_address: str = "1.1.1.1"
93+
94+
assert key_mapping.get_mapping(ip_address=ip_address).key_name == key_name
95+
96+
assert key_mapping.get_mapping(ip_address="2.2.2.2") is None

tests/models/key_model_test.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import pytest
2+
3+
from ssh_manager_backend.app.models.keys import KeyModel
4+
from ssh_manager_backend.app.models.user import UserModel
5+
from tests.test_ssh_manager_backend import db_cleanup
6+
7+
8+
class TestKeyModel:
9+
@pytest.fixture
10+
def cleanup(self):
11+
yield
12+
db_cleanup()
13+
14+
def test_create(self):
15+
user = UserModel()
16+
name: str = "test_user"
17+
username: str = "test_username"
18+
password: str = "test_password"
19+
admin: bool = False
20+
encrypted_dek: bytes = b"test_encrypted_dek"
21+
iv_for_dek: bytes = b"test_iv_for_dek"
22+
salt_for_dek: bytes = b"test_salt_for_dek"
23+
iv_for_kek: bytes = b"test_iv_for_kek"
24+
salt_for_kek: bytes = b"test_salt_for_kek"
25+
salt_for_password: bytes = b"test_salt_for_password"
26+
27+
key = KeyModel()
28+
key_name: str = "test_key"
29+
encrypted_key: bytes = b"encrypted_test_key"
30+
key_hash: str = "test_key_hash"
31+
32+
assert (
33+
key.create(
34+
name=key_name,
35+
encrypted_key=encrypted_key,
36+
key_hash=key_hash,
37+
user_id=12345,
38+
)
39+
is False
40+
)
41+
42+
assert (
43+
user.create(
44+
name=name,
45+
username=username,
46+
password=password,
47+
admin=admin,
48+
encrypted_dek=encrypted_dek,
49+
iv_for_dek=iv_for_dek,
50+
salt_for_dek=salt_for_dek,
51+
iv_for_kek=iv_for_kek,
52+
salt_for_kek=salt_for_kek,
53+
salt_for_password=salt_for_password,
54+
)
55+
is True
56+
)
57+
58+
user_id = user.get_user(username=username).id
59+
60+
assert (
61+
key.create(
62+
name=key_name,
63+
encrypted_key=encrypted_key,
64+
key_hash=key_hash,
65+
user_id=user_id,
66+
)
67+
is True
68+
)
69+
70+
def test_exists(self):
71+
key = KeyModel()
72+
key_name: str = "test_key"
73+
74+
assert key.exists(key_name=key_name) is True
75+
76+
assert key.exists(key_name="non_existent_key") is False
77+
78+
def test_get_key(self, cleanup):
79+
key = KeyModel()
80+
key_name: str = "test_key"
81+
82+
assert key.get_key(key_name=key_name).name == key_name
83+
84+
assert key.get_key(key_name="non_existent_keyname") is None

0 commit comments

Comments
 (0)