Skip to content

Commit 48baef4

Browse files
authored
Merge pull request #61 from rok4/feature/storage-read-optimization
Client S3 avec connexions ouvertes et cache LRU de lecture
2 parents f01405b + 73b49d7 commit 48baef4

File tree

2 files changed

+89
-29
lines changed

2 files changed

+89
-29
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
## 2.0.0
2+
3+
### [Fixed]
4+
5+
* Pyramid
6+
* quand on lit une tuile dans une pyramide PNG 1 canal, on retourne bien aussi un numpy.array à 3 dimensions (la dernière dimension sera bien un array à un élément)
7+
8+
### [Changed]
9+
10+
* Storage
11+
* Le client S3 garde ouverte des connexions
12+
* La fonction get_data_binary a un système de cache de type LRU, avec un temps de validité de 5 minutes
13+
114
## 1.7.1
215

316
### [Added]

src/rok4/storage.py

Lines changed: 76 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
- ROK4_S3_KEY
1919
- ROK4_S3_SECRETKEY
2020
- ROK4_S3_URL
21+
- ROK4_SSL_NO_VERIFY (optionnal) with a non empty value disables certificate check.. Define PYTHONWARNINGS to "ignore:Unverified HTTPS request" to disable warnings logs
2122
2223
To use several S3 clusters, each environment variable have to contain a list (comma-separated), with the same number of elements
2324
@@ -34,6 +35,8 @@
3435
import os
3536
import re
3637
import tempfile
38+
import time
39+
from functools import lru_cache
3740
from shutil import copyfile
3841
from typing import Dict, Tuple, Union
3942

@@ -67,6 +70,11 @@
6770
__S3_DEFAULT_CLIENT = None
6871

6972

73+
def __get_ttl_hash():
74+
"""Return the same value withing 5 minutes time period"""
75+
return round(time.time() / 300)
76+
77+
7078
def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", str]], str, str]:
7179
"""Get the S3 client
7280
@@ -86,6 +94,10 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s
8694
global __S3_CLIENTS, __S3_DEFAULT_CLIENT
8795

8896
if not __S3_CLIENTS:
97+
verify = True
98+
if "ROK4_SSL_NO_VERIFY" in os.environ and os.environ["ROK4_SSL_NO_VERIFY"] != "":
99+
verify = False
100+
89101
# C'est la première fois qu'on cherche à utiliser le stockage S3, chargeons les informations depuis les variables d'environnement
90102
try:
91103
keys = os.environ["ROK4_S3_KEY"].split(",")
@@ -109,7 +121,9 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s
109121
"s3",
110122
aws_access_key_id=keys[i],
111123
aws_secret_access_key=secret_keys[i],
124+
verify=verify,
112125
endpoint_url=urls[i],
126+
config=botocore.config.Config(tcp_keepalive=True, max_pool_connections=10),
113127
),
114128
"key": keys[i],
115129
"secret_key": secret_keys[i],
@@ -271,6 +285,7 @@ def get_data_str(path: str) -> str:
271285
MissingEnvironmentError: Missing object storage informations
272286
StorageError: Storage read issue
273287
FileNotFoundError: File or object does not exist
288+
NotImplementedError: Storage type not handled
274289
275290
Returns:
276291
str: Data content
@@ -279,17 +294,20 @@ def get_data_str(path: str) -> str:
279294
return get_data_binary(path).decode("utf-8")
280295

281296

282-
def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
283-
"""Load data into a binary string
297+
@lru_cache(maxsize=50)
298+
def __get_cached_data_binary(path: str, ttl_hash: int, range: Tuple[int, int] = None) -> str:
299+
"""Load data into a binary string, using a LRU cache
284300
285301
Args:
286302
path (str): path to data
303+
ttl_hash (int): time hash, to invalid cache
287304
range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None.
288305
289306
Raises:
290307
MissingEnvironmentError: Missing object storage informations
291308
StorageError: Storage read issue
292309
FileNotFoundError: File or object does not exist
310+
NotImplementedError: Storage type not handled
293311
294312
Returns:
295313
str: Data binary content
@@ -329,7 +347,7 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
329347
except Exception as e:
330348
raise StorageError("S3", e)
331349

332-
elif storage_type == StorageType.CEPH:
350+
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
333351
ioctx = __get_ceph_ioctx(tray_name)
334352

335353
try:
@@ -372,14 +390,35 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
372390
except Exception as e:
373391
raise StorageError(storage_type.name, e)
374392
else:
375-
raise NotImplementedError
393+
raise NotImplementedError("Cannot get partial data for storage type HTTP(S)")
376394

377395
else:
378-
raise StorageError("UNKNOWN", "Unhandled storage type to read binary data")
396+
raise NotImplementedError(f"Cannot get data for storage type {storage_type.name}")
379397

380398
return data
381399

382400

401+
def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
402+
"""Load data into a binary string
403+
404+
This function uses a LRU cache, with a TTL of 5 minutes
405+
406+
Args:
407+
path (str): path to data
408+
range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None.
409+
410+
Raises:
411+
MissingEnvironmentError: Missing object storage informations
412+
StorageError: Storage read issue
413+
FileNotFoundError: File or object does not exist
414+
NotImplementedError: Storage type not handled
415+
416+
Returns:
417+
str: Data binary content
418+
"""
419+
return __get_cached_data_binary(path, __get_ttl_hash(), range)
420+
421+
383422
def put_data_str(data: str, path: str) -> None:
384423
"""Store string data into a file or an object
385424
@@ -392,6 +431,7 @@ def put_data_str(data: str, path: str) -> None:
392431
Raises:
393432
MissingEnvironmentError: Missing object storage informations
394433
StorageError: Storage write issue
434+
NotImplementedError: Storage type not handled
395435
"""
396436

397437
storage_type, path, tray_name, base_name = get_infos_from_path(path)
@@ -406,7 +446,7 @@ def put_data_str(data: str, path: str) -> None:
406446
except Exception as e:
407447
raise StorageError("S3", e)
408448

409-
elif storage_type == StorageType.CEPH:
449+
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
410450
ioctx = __get_ceph_ioctx(tray_name)
411451

412452
try:
@@ -423,7 +463,7 @@ def put_data_str(data: str, path: str) -> None:
423463
raise StorageError("FILE", e)
424464

425465
else:
426-
raise StorageError("UNKNOWN", "Unhandled storage type to write string data")
466+
raise NotImplementedError(f"Cannot write data for storage type {storage_type.name}")
427467

428468

429469
def get_size(path: str) -> int:
@@ -435,6 +475,7 @@ def get_size(path: str) -> int:
435475
Raises:
436476
MissingEnvironmentError: Missing object storage informations
437477
StorageError: Storage read issue
478+
NotImplementedError: Storage type not handled
438479
439480
Returns:
440481
int: file/object size, in bytes
@@ -453,7 +494,7 @@ def get_size(path: str) -> int:
453494
except Exception as e:
454495
raise StorageError("S3", e)
455496

456-
elif storage_type == StorageType.CEPH:
497+
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
457498
ioctx = __get_ceph_ioctx(tray_name)
458499

459500
try:
@@ -478,7 +519,7 @@ def get_size(path: str) -> int:
478519
raise StorageError(storage_type.name, e)
479520

480521
else:
481-
raise StorageError("UNKNOWN", "Unhandled storage type to get size")
522+
raise NotImplementedError(f"Cannot get size for storage type {storage_type.name}")
482523

483524

484525
def exists(path: str) -> bool:
@@ -490,6 +531,7 @@ def exists(path: str) -> bool:
490531
Raises:
491532
MissingEnvironmentError: Missing object storage informations
492533
StorageError: Storage read issue
534+
NotImplementedError: Storage type not handled
493535
494536
Returns:
495537
bool: file/object existing status
@@ -509,7 +551,7 @@ def exists(path: str) -> bool:
509551
else:
510552
raise StorageError("S3", e)
511553

512-
elif storage_type == StorageType.CEPH:
554+
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
513555
ioctx = __get_ceph_ioctx(tray_name)
514556

515557
try:
@@ -534,7 +576,7 @@ def exists(path: str) -> bool:
534576
raise StorageError(storage_type.name, e)
535577

536578
else:
537-
raise StorageError("UNKNOWN", "Unhandled storage type to test if exists")
579+
raise NotImplementedError(f"Cannot test existence for storage type {storage_type.name}")
538580

539581

540582
def remove(path: str) -> None:
@@ -546,6 +588,7 @@ def remove(path: str) -> None:
546588
Raises:
547589
MissingEnvironmentError: Missing object storage informations
548590
StorageError: Storage removal issue
591+
NotImplementedError: Storage type not handled
549592
"""
550593
storage_type, path, tray_name, base_name = get_infos_from_path(path)
551594

@@ -557,7 +600,7 @@ def remove(path: str) -> None:
557600
except Exception as e:
558601
raise StorageError("S3", e)
559602

560-
elif storage_type == StorageType.CEPH:
603+
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
561604
ioctx = __get_ceph_ioctx(tray_name)
562605

563606
try:
@@ -576,7 +619,7 @@ def remove(path: str) -> None:
576619
raise StorageError("FILE", e)
577620

578621
else:
579-
raise StorageError("UNKNOWN", "Unhandled storage type to remove things")
622+
raise NotImplementedError(f"Cannot remove data for storage type {storage_type.name}")
580623

581624

582625
def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
@@ -588,8 +631,9 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
588631
from_md5 (str, optional): MD5 sum, re-processed after copy and controlled. Defaults to None.
589632
590633
Raises:
591-
StorageError: Unhandled copy or copy issue
634+
StorageError: Copy issue
592635
MissingEnvironmentError: Missing object storage informations
636+
NotImplementedError: Storage type not handled
593637
"""
594638

595639
from_type, from_path, from_tray, from_base_name = get_infos_from_path(from_path)
@@ -687,7 +731,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
687731
except Exception as e:
688732
raise StorageError("S3", f"Cannot copy S3 object {from_path} to {to_path} : {e}")
689733

690-
elif from_type == StorageType.CEPH and to_type == StorageType.FILE:
734+
elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.FILE:
691735
ioctx = __get_ceph_ioctx(from_tray)
692736

693737
if from_md5 is not None:
@@ -726,7 +770,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
726770
"CEPH and FILE", f"Cannot copy CEPH object {from_path} to file {to_path} : {e}"
727771
)
728772

729-
elif from_type == StorageType.FILE and to_type == StorageType.CEPH:
773+
elif from_type == StorageType.FILE and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
730774
ioctx = __get_ceph_ioctx(to_tray)
731775

732776
if from_md5 is not None:
@@ -763,7 +807,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
763807
"FILE and CEPH", f"Cannot copy file {from_path} to CEPH object {to_path} : {e}"
764808
)
765809

766-
elif from_type == StorageType.CEPH and to_type == StorageType.CEPH:
810+
elif from_type == StorageType.CEPH and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
767811
from_ioctx = __get_ceph_ioctx(from_tray)
768812
to_ioctx = __get_ceph_ioctx(to_tray)
769813

@@ -795,7 +839,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
795839
except Exception as e:
796840
raise StorageError("CEPH", f"Cannot copy CEPH object {from_path} to {to_path} : {e}")
797841

798-
elif from_type == StorageType.CEPH and to_type == StorageType.S3:
842+
elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.S3:
799843
from_ioctx = __get_ceph_ioctx(from_tray)
800844

801845
s3_client, to_bucket = __get_s3_client(to_tray)
@@ -853,8 +897,10 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
853897
)
854898

855899
elif (
856-
from_type == StorageType.HTTP or from_type == StorageType.HTTPS
857-
) and to_type == StorageType.CEPH:
900+
(from_type == StorageType.HTTP or from_type == StorageType.HTTPS)
901+
and to_type == StorageType.CEPH
902+
and CEPH_RADOS_AVAILABLE
903+
):
858904
to_ioctx = __get_ceph_ioctx(to_tray)
859905

860906
try:
@@ -896,9 +942,8 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
896942
)
897943

898944
else:
899-
raise StorageError(
900-
f"{from_type.name} and {to_type.name}",
901-
f"Cannot copy from {from_type.name} to {to_type.name}",
945+
raise NotImplementedError(
946+
f"Cannot copy data from storage type {from_type.name} to storage type {to_type.name}"
902947
)
903948

904949

@@ -911,8 +956,9 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None:
911956
hard (bool, optional): hard link rather than symbolic. Only for FILE storage. Defaults to False.
912957
913958
Raises:
914-
StorageError: Unhandled link or link issue
959+
StorageError: link issue
915960
MissingEnvironmentError: Missing object storage informations
961+
NotImplementedError: Storage type not handled
916962
"""
917963

918964
target_type, target_path, target_tray, target_base_name = get_infos_from_path(target_path)
@@ -947,7 +993,7 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None:
947993
except Exception as e:
948994
raise StorageError("S3", e)
949995

950-
elif target_type == StorageType.CEPH:
996+
elif target_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
951997
ioctx = __get_ceph_ioctx(link_tray)
952998

953999
try:
@@ -965,7 +1011,7 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None:
9651011
raise StorageError("FILE", e)
9661012

9671013
else:
968-
raise StorageError("UNKNOWN", "Unhandled storage type to make link")
1014+
raise NotImplementedError(f"Cannot make link for storage type {target_type.name}")
9691015

9701016

9711017
def get_osgeo_path(path: str) -> str:
@@ -1013,6 +1059,7 @@ def size_path(path: str) -> int:
10131059
Raises:
10141060
StorageError: Unhandled link or link issue
10151061
MissingEnvironmentError: Missing object storage informations
1062+
NotImplementedError: Storage type not handled
10161063
10171064
Returns:
10181065
int: size of the path
@@ -1052,9 +1099,9 @@ def size_path(path: str) -> int:
10521099
except Exception as e:
10531100
raise StorageError("S3", e)
10541101

1055-
elif storage_type == StorageType.CEPH:
1056-
raise NotImplementedError
10571102
else:
1058-
raise StorageError("UNKNOWN", "Unhandled storage type to calculate size")
1103+
raise NotImplementedError(
1104+
f"Cannot get prefix path size for storage type {storage_type.name}"
1105+
)
10591106

10601107
return total

0 commit comments

Comments
 (0)