-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathtest_user_sync.py
More file actions
270 lines (218 loc) · 11 KB
/
test_user_sync.py
File metadata and controls
270 lines (218 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
from typing import List
from pytest_mock import MockerFixture
from sqlalchemy.orm import Session
from api.models import AccessRequest, AccessRequestStatus, OktaGroup, OktaUser, OktaUserGroupMember, RoleGroup
from api.extensions import Db
from api.operations import CreateAccessRequest
from api.services import okta
from api.services.okta_service import User, UserSchema
from api.syncer import sync_users
from tests.factories import UserFactory, UserSchemaFactory
def test_user_sync_no_changes(db: Db, mocker: MockerFixture) -> None:
initial_users_in_okta = UserFactory.create_batch(3)
initial_db_users = seed_db(db, initial_users_in_okta)
new_db_users = run_sync(db, mocker, initial_users_in_okta)
for i in range(len(initial_users_in_okta)):
assert okta_users_are_equal(
get_user_by_id(initial_db_users, initial_users_in_okta[i].id),
get_user_by_id(new_db_users, initial_users_in_okta[i].id),
)
def test_user_sync_updates_fields(db: Db, mocker: MockerFixture) -> None:
initial_users_in_okta = UserFactory.create_batch(1)
_ = seed_db(db, initial_users_in_okta)
initial_users_in_okta[0].profile.login = "changed"
new_db_users = run_sync(db, mocker, initial_users_in_okta)
assert get_user_by_id(new_db_users, initial_users_in_okta[0].id).email == "changed"
def test_user_sync_updates_deleted_user(
db: Db, mocker: MockerFixture, okta_group: OktaGroup, role_group: RoleGroup
) -> None:
initial_users_in_okta = UserFactory.create_batch(1)
initial_db_users = seed_db(db, initial_users_in_okta)
# Add a managed group membership and ownership to the user to be deleted
db.session.add(okta_group)
db.session.commit()
db.session.commit()
managed_membership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=okta_group.id, ended_at=None, is_owner=False
)
db.session.add(managed_membership)
managed_ownership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=okta_group.id, ended_at=None, is_owner=True
)
db.session.add(managed_ownership)
db.session.commit()
# Add a unmanaged group membership and ownership to the user to be deleted
role_group.is_managed = False
db.session.add(role_group)
db.session.commit()
db.session.commit()
unmanaged_membership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=role_group.id, ended_at=None, is_owner=False
)
db.session.add(unmanaged_membership)
unmanaged_ownership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=role_group.id, ended_at=None, is_owner=True
)
db.session.add(unmanaged_ownership)
db.session.commit()
# Create AccessRequest to test it gets rejected
access_request = CreateAccessRequest(
requester_user=initial_db_users[0],
requested_group=okta_group,
request_ownership=False,
request_reason="test reason",
).execute()
initial_users_in_okta[0].status = "DEPROVISIONED"
initial_users_in_okta[0].status_changed = "2022-06-02 11:54:51.724560"
delete_membership_spy = mocker.patch.object(okta, "async_remove_user_from_group")
delete_ownership_spy = mocker.patch.object(okta, "async_remove_owner_from_group")
new_db_users = run_sync(db, mocker, initial_users_in_okta)
deleted_user = get_user_by_id(new_db_users, initial_users_in_okta[0].id)
assert str(deleted_user.deleted_at) == initial_users_in_okta[0].status_changed
assert db.session.get(OktaUser, initial_users_in_okta[0].id).deleted_at is not None
assert access_request is not None
assert db.session.get(AccessRequest, access_request.id).status == AccessRequestStatus.REJECTED
assert db.session.get(OktaUserGroupMember, managed_membership.id).ended_at is not None
assert db.session.get(OktaUserGroupMember, managed_ownership.id).ended_at is not None
assert db.session.get(OktaUserGroupMember, unmanaged_membership.id).ended_at is not None
assert db.session.get(OktaUserGroupMember, unmanaged_ownership.id).ended_at is not None
# Should only be called once each for the managed group
assert delete_membership_spy.call_count == 1
assert delete_ownership_spy.call_count == 1
def test_user_sync_deletes_disappearing_user(
db: Db, mocker: MockerFixture, okta_group: OktaGroup, role_group: RoleGroup
) -> None:
initial_users_in_okta = UserFactory.create_batch(1)
initial_db_users = seed_db(db, initial_users_in_okta)
# Add a managed group membership and ownership to the user to be deleted
db.session.add(okta_group)
db.session.commit()
db.session.commit()
managed_membership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=okta_group.id, ended_at=None, is_owner=False
)
db.session.add(managed_membership)
managed_ownership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=okta_group.id, ended_at=None, is_owner=True
)
db.session.add(managed_ownership)
db.session.commit()
# Add a unmanaged group membership and ownership to the user to be deleted
role_group.is_managed = False
db.session.add(role_group)
db.session.commit()
db.session.commit()
unmanaged_membership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=role_group.id, ended_at=None, is_owner=False
)
db.session.add(unmanaged_membership)
unmanaged_ownership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=role_group.id, ended_at=None, is_owner=True
)
db.session.add(unmanaged_ownership)
db.session.commit()
# Create AccessRequest to test it gets rejected
access_request = CreateAccessRequest(
requester_user=initial_db_users[0],
requested_group=okta_group,
request_ownership=False,
request_reason="test reason",
).execute()
delete_membership_spy = mocker.patch.object(okta, "async_remove_user_from_group")
delete_ownership_spy = mocker.patch.object(okta, "async_remove_owner_from_group")
assert get_user_by_id(initial_db_users, initial_users_in_okta[0].id).deleted_at is None
new_db_users = run_sync(db, mocker, [])
assert len(initial_db_users) == len(new_db_users)
assert get_user_by_id(new_db_users, initial_users_in_okta[0].id).deleted_at is not None
assert db.session.get(OktaUser, initial_users_in_okta[0].id).deleted_at is not None
assert access_request is not None
assert db.session.get(AccessRequest, access_request.id).status == AccessRequestStatus.REJECTED
assert db.session.get(OktaUserGroupMember, managed_membership.id).ended_at is not None
assert db.session.get(OktaUserGroupMember, managed_ownership.id).ended_at is not None
assert db.session.get(OktaUserGroupMember, unmanaged_membership.id).ended_at is not None
assert db.session.get(OktaUserGroupMember, unmanaged_ownership.id).ended_at is not None
# Should never be called as the user no longer exists in Okta
assert delete_membership_spy.call_count == 0
assert delete_ownership_spy.call_count == 0
def test_user_sync_ends_memberships_for_previously_deleted_user(
db: Db, mocker: MockerFixture, okta_group: OktaGroup, role_group: RoleGroup
) -> None:
initial_users_in_okta = UserFactory.create_batch(1)
seed_db(db, initial_users_in_okta)
# Mark the user as deleted in Access
db.session.get(OktaUser, initial_users_in_okta[0].id).deleted_at = db.func.now()
# Add a managed group membership and ownership to the user to be deleted
db.session.add(okta_group)
db.session.commit()
db.session.commit()
managed_membership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=okta_group.id, ended_at=None, is_owner=False
)
db.session.add(managed_membership)
managed_ownership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=okta_group.id, ended_at=None, is_owner=True
)
db.session.add(managed_ownership)
db.session.commit()
# Add a unmanaged group membership and ownership to the user to be deleted
role_group.is_managed = False
db.session.add(role_group)
db.session.commit()
db.session.commit()
unmanaged_membership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=role_group.id, ended_at=None, is_owner=False
)
db.session.add(unmanaged_membership)
unmanaged_ownership = OktaUserGroupMember(
user_id=initial_users_in_okta[0].id, group_id=role_group.id, ended_at=None, is_owner=True
)
db.session.add(unmanaged_ownership)
db.session.commit()
initial_users_in_okta[0].status = "DEPROVISIONED"
initial_users_in_okta[0].status_changed = "2022-06-02 11:54:51.724560"
delete_membership_spy = mocker.patch.object(okta, "async_remove_user_from_group")
delete_ownership_spy = mocker.patch.object(okta, "async_remove_owner_from_group")
new_db_users = run_sync(db, mocker, initial_users_in_okta)
deleted_user = get_user_by_id(new_db_users, initial_users_in_okta[0].id)
assert str(deleted_user.deleted_at) == initial_users_in_okta[0].status_changed
assert db.session.get(OktaUser, initial_users_in_okta[0].id).deleted_at is not None
assert db.session.get(OktaUserGroupMember, managed_membership.id).ended_at is not None
assert db.session.get(OktaUserGroupMember, managed_ownership.id).ended_at is not None
assert db.session.get(OktaUserGroupMember, unmanaged_membership.id).ended_at is not None
assert db.session.get(OktaUserGroupMember, unmanaged_ownership.id).ended_at is not None
# Should only be called once each for the managed group
assert delete_membership_spy.call_count == 1
assert delete_ownership_spy.call_count == 1
def seed_db(db: Db, users: List[User]) -> List[OktaUser]:
with Session(db.engine) as session:
session.add_all([User(u).update_okta_user(OktaUser(), {}) for u in users])
session.commit()
return session.query(OktaUser).all()
def run_sync(db: Db, mocker: MockerFixture, okta_users: List[User]) -> List[OktaUser]:
schema = UserSchemaFactory.create()
with Session(db.engine) as session:
mocker.patch.object(okta, "list_users", return_value=[User(u) for u in okta_users])
mocker.patch.object(okta, "get_user_schema", return_value=UserSchema(schema))
sync_users()
return session.query(OktaUser).all()
def get_user_by_id(user_list: List[OktaUser], user_id: str) -> OktaUser:
ret = None
for x in user_list:
if x.id == user_id:
ret = x
break
assert ret is not None
return ret
def okta_users_are_equal(left: OktaUser, right: OktaUser) -> bool:
# Checks if there are property differences between
# two OktaUser objects without implementing it on the model
# itself.
return (
left.id == right.id
and left.email == right.email
and left.last_name == right.last_name
and left.first_name == right.first_name
and left.display_name == right.display_name
and left.deleted_at == right.deleted_at
and left.created_at == right.created_at
)