-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequest_model.py
More file actions
109 lines (91 loc) · 3.73 KB
/
request_model.py
File metadata and controls
109 lines (91 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""
Created By : Ankur
"""
import re
from collections import defaultdict
from functools import wraps
from copy import deepcopy
class RequestModel:
""""""
def __init__(self, *, data_type, required=True, regex=None, nested=None):
""""""
self.data_type = data_type
self.required = required
self.regex = regex
self.nested = nested if nested else {}
class RequestModelValidator:
""""""
def __init__(self, model, type_=None):
""""""
self.model = model
self.request_data = None
self.error_data = None
self.type = type_
def __call__(self, func):
""""""
@wraps(func)
def wrapper(*args):
""""""
self.request_data = None
self.error_data = defaultdict(set)
self.unnecessary_keys_json = set()
request = args[1]
if self.type == 'query_params':
self.request_data = request.query_params
else:
self.request_data = request.data
self.request_data = dict(self.request_data)
if 'application/json' not in request.headers.get('Content-type', '') or self.type == 'query_params':
self.detect_unnecessary_keys()
for key, value in self.model.items():
self.validate_querydict(
key, value, self.request_data.get(key))
else:
self.detect_unnecessary_keys_json(
self.request_data, self.model)
if self.unnecessary_keys_json:
raise Exception(f"Un-necessary Keys Present {self.unnecessary_keys_json}")
self.validate_json(self.request_data, self.model)
if self.error_data:
raise Exception(f"Invalid Request {self.error_data}")
return func(*args)
return wrapper
def detect_unnecessary_keys(self):
""""""
unnecessary_key = set()
for key, value in self.request_data.items():
if key not in self.model:
unnecessary_key.add(key)
if unnecessary_key:
raise Exception(f"Un-necessary Keys Present {unnecessary_key}")
def validate_querydict(self, key, model_value, request_value):
""""""
if isinstance(request_value, list):
for data in request_value:
self.validate_request(key, model_value, data)
else:
self.validate_request(key, model_value, request_value)
def validate_request(self, key, model_value, request_value):
""""""
if model_value.required and request_value is None:
self.error_data['requiredValueMissing'].add(key)
if request_value is not None:
if not isinstance(request_value, model_value.data_type):
self.error_data['invalidDatatype'].add(key)
if model_value.regex and request_value:
if not re.match(re.compile(model_value.regex), request_value):
self.error_data['regexValidationFailed'].add(key)
def validate_json(self, request_data, model):
""""""
for key, value in model.items():
self.validate_request(key, value, request_data.get(key))
if value.nested:
self.validate_json(request_data.get(key, {}), value.nested)
def detect_unnecessary_keys_json(self, request_data, model):
""""""
for key, value in request_data.items():
if not model.get(key):
self.unnecessary_keys_json.add(key)
continue
if isinstance(value, dict):
self.detect_unnecessary_keys_json(request_data.get(key), model.get(key).nested)