Skip to content

Commit 36345bd

Browse files
author
Levi Cameron
committed
refactor: allow fully-blank user_type columns
1 parent 7d1be17 commit 36345bd

File tree

2 files changed

+91
-26
lines changed

2 files changed

+91
-26
lines changed

csv_permissions/permissions.py

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from csv import DictReader
1+
import csv
22
from functools import lru_cache
33
from pathlib import Path
44
from typing import Dict
@@ -78,39 +78,55 @@ def _parse_csv(
7878
"""
7979

8080
with open(file_path, "r") as csv_file:
81-
reader = DictReader(csv_file, skipinitialspace=True)
81+
reader = csv.reader(csv_file, skipinitialspace=True)
8282

83-
if reader.fieldnames[:4] != ["Model", "App", "Action", "Is Global"]:
84-
raise ValueError(f"Invalid csv_permissions CSV column headers found in {file_path}")
83+
# get first row of headers
84+
fieldnames = next(reader)
85+
86+
prelim_headers = ["Model", "App", "Action", "Is Global"]
87+
prelim_header_count = len(prelim_headers)
8588

86-
user_types = reader.fieldnames[4:]
87-
if not user_types:
89+
if fieldnames[:prelim_header_count] != prelim_headers:
8890
raise ValueError(f"Invalid csv_permissions CSV column headers found in {file_path}")
8991

92+
user_type_headers = fieldnames[prelim_header_count:]
93+
94+
nonempty_user_type_headers = [user_type for user_type in user_type_headers if user_type != ""]
95+
if len(set(nonempty_user_type_headers)) != len(nonempty_user_type_headers):
96+
raise ValueError(f"Duplicate csv_permissions CSV column headers found in {file_path}")
97+
98+
if len(nonempty_user_type_headers) == 0:
99+
raise ValueError(f"Missing user_type headers in {file_path}")
100+
90101
perm_is_global = {}
91102
perm_user_type_unresolved: Dict[PermName, Dict[UserType, UnresolvedEvaluator]] = {}
92103

93104
# We can't just count the number of permissions read because we don't consider
94105
# a file with commented out lines to be empty so keep track with a flag
95106
was_empty = True
96-
for row in reader:
107+
108+
for line_number, row in enumerate(reader):
97109
was_empty = False
98-
if all(x is None or x.strip() == "" for x in row.values()):
110+
if all(x is None or x.strip() == "" for x in row):
99111
# ignore completely empty rows
100112
continue
101-
model_name_orig = row["Model"] # note that capitalisation may differ to model._meta.model_name
102113

103-
if any(model_name_orig.strip().startswith(comment_prefix) for comment_prefix in ("//", "#")):
114+
if any(row[0].strip().startswith(comment_prefix) for comment_prefix in ("//", "#")):
104115
# Ignore lines beginning with comment chars
105116
continue
106117

107-
app_config = apps.get_app_config(row["App"])
118+
if len(row) < prelim_header_count:
119+
raise ValueError(f"Incomplete line {line_number} in {csv_file}")
120+
121+
# note that model capitalisation may differ to model._meta.model_name
122+
model_name_orig, app_label, action, is_global = row[:prelim_header_count]
123+
124+
app_config = apps.get_app_config(app_label)
108125
model = app_config.get_model(model_name_orig) if model_name_orig else None
109126

110-
action = row["Action"]
111-
if row["Is Global"] == "yes":
127+
if is_global == "yes":
112128
is_global = True
113-
elif row["Is Global"] == "no":
129+
elif is_global == "no":
114130
is_global = False
115131
else:
116132
raise ValueError("Invalid value for Is Global.")
@@ -121,22 +137,30 @@ def _parse_csv(
121137
perm_is_global[permission] = is_global
122138
perm_user_type_unresolved[permission] = {}
123139

124-
for user_type in user_types:
125-
evaluator_name = row[user_type]
140+
for i, user_type in enumerate(user_type_headers):
141+
try:
142+
evaluator_name = row[prelim_header_count + i]
143+
except IndexError:
144+
continue
126145

127-
perm_user_type_unresolved[permission][user_type] = UnresolvedEvaluator(
128-
app_config=app_config,
129-
model=model,
130-
is_global=is_global,
131-
permission=permission,
132-
action=action,
133-
evaluator_name=evaluator_name,
134-
)
146+
if user_type == "":
147+
# if a column has an empty user type then that's allowed but only if the entire column is empty
148+
if evaluator_name != "":
149+
raise ValueError(f"Columns with an empty user_type must be completely empty")
150+
else:
151+
perm_user_type_unresolved[permission][user_type] = UnresolvedEvaluator(
152+
app_config=app_config,
153+
model=model,
154+
is_global=is_global,
155+
permission=permission,
156+
action=action,
157+
evaluator_name=evaluator_name,
158+
)
135159

136160
if was_empty:
137161
raise ValueError("Empty permissions file")
138162

139-
return perm_is_global, perm_user_type_unresolved, user_types
163+
return perm_is_global, perm_user_type_unresolved, nonempty_user_type_headers
140164

141165

142166
# should be at least as large as the number of CSV files we load. This gets called by every has_perm() so must be cached

test_csv_permissions/test_parse.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def test_ignored_lines(self):
126126
with override_csv_permissions([csv_header_data + "\n" * 3 + "#" + csv_bad_line]):
127127
self.assertFalse(user.has_perm("test_csv_permissions.approve_testmodela", None))
128128

129-
with self.assertRaises(LookupError):
129+
with self.assertRaisesRegex(ValueError, 'Incomplete'):
130130
with override_csv_permissions([csv_header_data + "\n" * 3 + csv_bad_line]):
131131
self.assertFalse(user.has_perm("test_csv_permissions.approve_testmodela", None))
132132

@@ -250,3 +250,44 @@ def test_multiple_csv_files_inconsistent_details(self):
250250
with self.assertRaisesRegex(ValueError, 'inconsistent with a previous CSV file'):
251251
with override_csv_permissions([csv_data1, csv_data2]):
252252
csv_permissions.permissions.CSVPermissionsBackend()
253+
254+
def test_empty_user_type(self):
255+
csv_data = f"""
256+
Model, App, Action, Is Global, , {USER1_TYPE},
257+
TestModelA, test_csv_permissions, foo, yes, , yes,
258+
""".strip()
259+
260+
user1 = User1Factory(email="user1@localhost.test")
261+
user_empty = User2Factory(email="user2@localhost.test")
262+
user_empty.user_type = ""
263+
264+
with override_csv_permissions([csv_data]):
265+
self.assertTrue(user1.has_perm("test_csv_permissions.foo_testmodela"))
266+
267+
with override_settings(CSV_PERMISSIONS_STRICT=True):
268+
with self.assertRaises(LookupError):
269+
user_empty.has_perm("test_csv_permissions.foo_testmodela")
270+
271+
with override_settings(CSV_PERMISSIONS_STRICT=False):
272+
self.assertFalse(user_empty.has_perm("test_csv_permissions.foo_testmodela"))
273+
274+
def test_empty_user_type_not_empty_column(self):
275+
csv_data = f"""
276+
Model, App, Action, Is Global, , {USER1_TYPE},
277+
TestModelA, test_csv_permissions, foo, yes, all, yes,
278+
""".strip()
279+
280+
with self.assertRaisesRegex(ValueError, 'empty user_type'):
281+
with override_csv_permissions([csv_data]):
282+
csv_permissions.permissions.CSVPermissionsBackend()
283+
284+
def test_duplicate_header(self):
285+
csv_data = f"""
286+
Model, App, Action, Is Global, {USER1_TYPE}, {USER1_TYPE},
287+
TestModelA, test_csv_permissions, foo, yes, yes, yes,
288+
""".strip()
289+
290+
with self.assertRaisesRegex(ValueError, 'Duplicate'):
291+
with override_csv_permissions([csv_data]):
292+
csv_permissions.permissions.CSVPermissionsBackend()
293+

0 commit comments

Comments
 (0)