Skip to content

Commit 5e615ed

Browse files
author
Samiul Sk
committed
feat: add url class for url genration
1 parent 480cc5b commit 5e615ed

File tree

1 file changed

+221
-0
lines changed

1 file changed

+221
-0
lines changed

imagekitio/url.py

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
import hashlib
2+
import hmac
3+
from datetime import datetime as dt
4+
from typing import Any, Dict, List
5+
from urllib.parse import ParseResult, urlparse, urlunparse
6+
7+
from imagekitio.constants.defaults import Default
8+
from imagekitio.constants.supported_transform import SUPPORTED_TRANS
9+
from imagekitio.utils.formatter import camel_dict_to_snake_dict, flatten_dict
10+
from imagekitio.constants.supported_transform import SUPPORTED_TRANS
11+
from .constants import ERRORS
12+
13+
TRANSFORMATION_PARAMETER = "tr"
14+
DEFAULT_TRANSFORMATION_POSITION = "path"
15+
QUERY_TRANSFORMATION_POSITION = "query"
16+
CHAIN_TRANSFORM_DELIMITER = ":"
17+
TRANSFORM_DELIMITER = ","
18+
TRANSFORM_KEY_VALUE_DELIMITER = "-"
19+
20+
SIGNATURE_PARAMETER = "ik-s"
21+
TIMESTAMP_PARAMETER = "ik-t"
22+
DEFAULT_TIMESTAMP = "9999999999"
23+
24+
25+
class Url(object):
26+
"""
27+
Url class holds the request and related methods
28+
to generate url(signed and unsigned)
29+
"""
30+
31+
def __init__(self, request_obj):
32+
self.request = request_obj
33+
34+
def generate_url(self, options: Dict = None) -> str:
35+
options = camel_dict_to_snake_dict(options)
36+
if options.get("src"):
37+
options["transformation_position"] = DEFAULT_TRANSFORMATION_POSITION
38+
extended_options = self.request.extend_url_options(options)
39+
return self.build_url(extended_options)
40+
41+
def build_url(self, options: dict) -> str:
42+
"""
43+
builds url for from all options,
44+
"""
45+
path = options.get("path", "")
46+
src = options.get("src", "")
47+
url_endpoint = options.get("url_endpoint", "")
48+
transformation_position = options.get("transformation_position")
49+
if transformation_position not in Default.VALID_TRANSFORMATION_POSITION.value:
50+
raise ValueError(ERRORS.INVALID_TRANSFORMATION_POSITION.value)
51+
52+
if src or (
53+
options.get("transformation_position") == QUERY_TRANSFORMATION_POSITION
54+
):
55+
src_param_used_for_url = True
56+
else:
57+
src_param_used_for_url = False
58+
if not (path or src):
59+
return ""
60+
result_url_dict = {"netloc": "", "path": "", "query": ""}
61+
if path:
62+
parsed_url = urlparse(path)
63+
parsed_host = urlparse(url_endpoint)
64+
result_url_dict["scheme"] = parsed_host.scheme
65+
result_url_dict["netloc"] = (parsed_host.netloc + parsed_host.path).lstrip(
66+
"/"
67+
)
68+
result_url_dict["path"] = parsed_url.path.strip("/")
69+
70+
else:
71+
parsed_url = urlparse(src)
72+
host = parsed_url.netloc
73+
if parsed_url.username:
74+
# creating host like username:password@domain.com if username is there in parsed url
75+
host = "{}:{}@{}".format(
76+
parsed_url.username, parsed_url.password, parsed_url.netloc
77+
)
78+
result_url_dict["netloc"] = host
79+
result_url_dict["scheme"] = parsed_url.scheme
80+
result_url_dict["path"] = parsed_url.path
81+
src_param_used_for_url = True
82+
83+
query_params = options.get("query_parameters", {})
84+
transformation_str = self.transformation_to_str(options.get("transformation"))
85+
if transformation_str:
86+
if (
87+
transformation_position == Default.QUERY_TRANSFORMATION_POSITION.value
88+
) or src_param_used_for_url:
89+
result_url_dict["query"] = "{}={}".format(
90+
TRANSFORMATION_PARAMETER, transformation_str
91+
)
92+
93+
else:
94+
result_url_dict["path"] = "{}:{}/{}".format(
95+
TRANSFORMATION_PARAMETER,
96+
transformation_str,
97+
result_url_dict["path"],
98+
)
99+
100+
result_url_dict["scheme"] = result_url_dict["scheme"] or "https"
101+
102+
# Signature String and Timestamp
103+
# We can do this only for URLs that are created using urlEndpoint and path parameter
104+
# because we need to know the endpoint to be able to remove it from the URL to create a signature
105+
# for the remaining. With the src parameter, we would not know the "pattern" in the URL
106+
if options.get("signed") and (not options.get("src")):
107+
expire_seconds = options.get("expire_seconds")
108+
private_key = options.get("private_key")
109+
expiry_timestamp = self.get_signature_timestamp(expire_seconds)
110+
111+
intermediate_url = urlunparse(
112+
result_url_dict.get(f, "") for f in ParseResult._fields
113+
)
114+
url_signature = self.get_signature(
115+
private_key=private_key,
116+
url=intermediate_url,
117+
url_endpoint=url_endpoint,
118+
expiry_timestamp=expiry_timestamp,
119+
)
120+
if expiry_timestamp and (expiry_timestamp != DEFAULT_TIMESTAMP):
121+
query_params[TIMESTAMP_PARAMETER] = expiry_timestamp
122+
query_params[SIGNATURE_PARAMETER] = url_signature
123+
query_params_str = "&".join(
124+
str(k) + "=" + str(v) for k, v in query_params.items()
125+
)
126+
result_url_dict["query"] = query_params_str
127+
result_url_dict = self.prepare_dict_for_unparse(result_url_dict)
128+
generated_url = urlunparse(
129+
result_url_dict.get(f, "") for f in ParseResult._fields
130+
)
131+
return generated_url
132+
133+
@staticmethod
134+
def get_signature_timestamp(seconds: int = None) -> int:
135+
"""
136+
this function returns either default time stamp
137+
or current unix time and expiry seconds to get
138+
signature time stamp
139+
"""
140+
if not seconds:
141+
return Default.DEFAULT_TIMESTAMP.value
142+
current_timestamp = int(dt.now().strftime("%s"))
143+
144+
return current_timestamp + seconds
145+
146+
@staticmethod
147+
def prepare_dict_for_unparse(url_dict: dict) -> dict:
148+
"""
149+
remove and add required back slash of 'netloc' and 'path'
150+
to parse it properly, urllib.parse.unparse() function can't
151+
create url properly if path doesn't have '/' at the start
152+
"""
153+
url_dict["netloc"] = url_dict["netloc"].rstrip("/")
154+
url_dict["path"] = "/" + url_dict["path"].strip("/")
155+
156+
return url_dict
157+
158+
@staticmethod
159+
def get_signature(private_key, url, url_endpoint, expiry_timestamp) -> str:
160+
""""
161+
create signature(hashed hex key) from
162+
private_key, url, url_endpoint and expiry_timestamp
163+
"""
164+
replaced_url = url.replace(url_endpoint, "") + str(expiry_timestamp)
165+
signature = hmac.new(
166+
key=replaced_url.encode(), msg=private_key.encode(), digestmod=hashlib.sha1
167+
)
168+
return signature.hexdigest()
169+
170+
@staticmethod
171+
def is_valid_trans_options(options: Dict[str, Any]) -> bool:
172+
"""
173+
check if transformation options parameter provided by user is valid
174+
so that ValueError exception can be raised with appropriate error
175+
message in the ImageKitRequest Class
176+
"""
177+
supported_trans_keys = SUPPORTED_TRANS.keys()
178+
# flattening to dict from list of dict to check key validation
179+
transformation_dict = flatten_dict(options.get("transformation", []))
180+
for key in transformation_dict:
181+
if key not in supported_trans_keys:
182+
return False
183+
return True
184+
185+
@staticmethod
186+
def is_valid_transformation_pos(trans_pos: str) -> bool:
187+
"""
188+
Returns if transformation position is valid as per Server Documentation
189+
"""
190+
return trans_pos in Default.VALID_TRANSFORMATION_POSITION.value
191+
192+
@staticmethod
193+
def transformation_to_str(transformation):
194+
"""
195+
creates transformation_position string for url from
196+
transformation_position dictionary
197+
"""
198+
if not isinstance(transformation, list):
199+
return ""
200+
parsed_transforms = []
201+
for i in range(len(transformation)):
202+
parsed_transform_step = []
203+
for key in transformation[i]:
204+
transform_key = SUPPORTED_TRANS.get(key, "")
205+
if not transform_key:
206+
transform_key = key
207+
208+
if transformation[i][key] == "-":
209+
parsed_transform_step.append(transform_key)
210+
else:
211+
parsed_transform_step.append(
212+
"{}{}{}".format(
213+
transform_key,
214+
TRANSFORM_KEY_VALUE_DELIMITER,
215+
transformation[i][key],
216+
)
217+
)
218+
219+
parsed_transforms.append(TRANSFORM_DELIMITER.join(parsed_transform_step))
220+
221+
return CHAIN_TRANSFORM_DELIMITER.join(parsed_transforms)

0 commit comments

Comments
 (0)