-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
200 lines (171 loc) · 5.64 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import os
import re
import io
import glob
import logging
import requests
from tqdm import tqdm
import numpy as np
import cv2
from PIL import Image
from tensorflow.python.lib.io import file_io
LOGGER = logging.getLogger(__name__)
def is_downloadable(url):
"""
Does the url contain a downloadable resource
"""
h = requests.head(url, allow_redirects=True)
header = h.headers
content_type = header.get('content-type')
if 'text' in content_type.lower():
return False
if 'html' in content_type.lower():
return False
return True
def get_filename_from_cd(cd):
"""
Get filename from content-disposition
"""
if not cd:
return None
fname = re.findall('filename=(.+)', cd)
if len(fname) == 0:
return None
return fname[0]
def download_image(url, save_to=None):
"""
Download an image from URL
Raises:
Exceptions for file not downloadable, error file write, error file read.
Args:
url: valid image url
save_to: (optional) save image to this path
Returns:
numpy.ndarray: Image as numpy array
"""
if not is_downloadable(url=url):
msg = 'url {} not downloadable'.format(url)
LOGGER.error(msg)
raise Exception(msg)
req = requests.get(url, allow_redirects=True)
filename = get_filename_from_cd(req.headers.get('content-disposition'))
if filename is None:
_, ext = os.path.splitext(url)
filename = 'default{}'.format(ext)
if save_to is not None:
if not os.path.exists(save_to):
os.makedirs(save_to)
filepath = os.path.join(save_to, filename)
else:
filepath = filename
with open(filepath, 'wb') as f:
f.write(req.content)
if not os.path.exists(filepath):
msg = 'error writing {} to disk.'.format(filepath)
LOGGER.error(msg)
raise Exception(msg)
img = np.array(Image.open(filepath))
if img is None:
msg = 'error reading {} from disk.'.format(filepath)
LOGGER.error(msg)
raise Exception(msg)
if save_to is None:
os.remove(filename)
return img
def load_image_from_gcs(filename: str) -> np.array:
"""
Load an image from GCS bucket into a numpy array
Args:
filename: GCS bucket location
Returns:
"""
try:
with file_io.FileIO(filename, 'rb') as gf: # tensorflow file_io takes care of GCS file loading
image_bytes = gf.read()
img = Image.open(io.BytesIO(image_bytes)).convert('RGB')
img = np.array(img)[:, :, ::-1] # convert to BGR (to match format of cv2.imread())
except Exception as e:
LOGGER.error('error loading image: {}'.format(filename))
raise Exception(e)
return img
def is_gcs_location(filename: str) -> bool:
"""
Check whether filepath is to a Google Cloud Storage bucket
Args:
filename:
Returns:
bool: whether file is located in google cloud storage
"""
# TODO: there may exist a better way to do this
return filename.startswith("gs://") or filename.startswith("s3://")
def get_image_filenames(dirname):
"""
Retrieve list of image filenames from directory or GCS bucket in order
Args:
dirname: path to directory or GCS bucket containing training data
Returns:
"""
if not isinstance(dirname, list): # let user specify list of dirs
dirname = [dirname]
image_extensions = ['jpg', 'jpeg', 'png', 'bmp', 'tif', 'tiff']
filenames = []
for dir in dirname:
for ext in image_extensions:
if is_gcs_location(dir):
files = file_io.get_matching_files(os.path.join(dir, '*.{}'.format(ext)))
else:
files = glob.glob(os.path.join(dir, '*.{}'.format(ext)))
filenames.extend(files)
return filenames
def frames_to_video(input_dir: str,
output_file: str,
fps: float,
resize: tuple = None):
"""
convert directory with images to video
Args:
input_dir:
output_file:
fps:
resize:
Returns:
Raises:
Exception: No images found in directory
"""
output_filename, output_ext = os.path.splitext(output_file)
fourcc_str = None
if output_ext.lower() == '.mkv':
fourcc_str = 'XVID'
input_filenames = get_image_filenames(input_dir)
def _sorted_nicely(l: list):
""" Sorts the given iterable in the way that is expected.
Required arguments:
l -- The iterable to be sorted.
"""
import re
convert = lambda text: int(text) if text.isdigit() else text
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
return sorted(l, key=alphanum_key)
input_filenames = _sorted_nicely(input_filenames)
if not input_filenames:
msg = 'No images found in directory {}'.format(input_dir)
LOGGER.error(msg)
raise Exception(msg)
def _load_img(filename: str):
if is_gcs_location(filename):
img = load_image_from_gcs(filename)
else:
img = cv2.imread(filename, cv2.IMREAD_COLOR)
return img
if resize:
output_width, output_height = resize
else:
tmpfile = input_filenames[0]
tmpimg = _load_img(tmpfile)
output_width, output_height = tmpimg.shape[1], tmpimg.shape[0]
fourcc = cv2.VideoWriter_fourcc(*fourcc_str)
vw = cv2.VideoWriter(output_file, fourcc, fps, (int(output_width), int(output_height)))
for input_filename in tqdm(input_filenames):
img = _load_img(filename=input_filename)
vw.write(img)
vw.release()