Skip to content

Commit

Permalink
Merge pull request #47 from umr-lops/improve_download
Browse files Browse the repository at this point in the history
Bug fix on download
  • Loading branch information
agrouaze authored Dec 4, 2023
2 parents 252a762 + 0489ed7 commit f017721
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 16 deletions.
48 changes: 34 additions & 14 deletions cdsodatacli/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil
import random
import pandas as pd
from requests.exceptions import ChunkedEncodingError
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np
import traceback
Expand All @@ -26,7 +27,8 @@
from cdsodatacli.utils import conf, test_safe_archive, test_safe_spool
from collections import defaultdict

chunksize = 4096
# chunksize = 4096
chunksize = 8192 # like in the CDSE example

# def CDS_Odata_download_one_product(session, headers, url, output_filepath):
# """
Expand Down Expand Up @@ -94,23 +96,36 @@ def CDS_Odata_download_one_product_v2(
# output_filepath_tmp = (
# output_filepath.replace(conf["spool"], conf["pre_spool"]) + ".tmp"
# )
output_filepath_tmp = os.path.join(conf["pre_spool"],os.path.basename(output_filepath)+ ".tmp")
output_filepath_tmp = os.path.join(
conf["pre_spool"], os.path.basename(output_filepath) + ".tmp"
)
safename_base = os.path.basename(output_filepath).replace(".zip", "")
with open(output_filepath_tmp, "wb") as f:
logging.debug("Downloading %s" % output_filepath)
response = session.get(url, headers=headers, stream=True)
status = response.status_code
status_meaning = response.reason
# Check for 'Transfer-Encoding: chunked'
if (
"Transfer-Encoding" in response.headers
and response.headers["Transfer-Encoding"] == "chunked"
):
logging.warning(
"Server is using 'Transfer-Encoding: chunked'. Content length may not be accurate."
)
if response.ok:
total_length = int(
int(response.headers.get("content-length")) / 1000 / 1000
)
logging.debug("total_length : %s Mo", total_length)

for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
if not response.ok and os.path.exists(output_filepath_tmp):
try:
for chunk in response.iter_content(chunk_size=chunksize):
if chunk:
f.write(chunk)
except ChunkedEncodingError as e:
status = -1
status_meaning = "ChunkedEncodingError"
if (not response.ok or status == -1) and os.path.exists(output_filepath_tmp):
logging.debug("remove empty file %s", output_filepath_tmp)
os.remove(output_filepath_tmp)
elapsed_time = time.time() - t0
Expand Down Expand Up @@ -250,7 +265,7 @@ def download_list_product_multithread_v2(
login=login,
date_generation_access_token=date_generation_access_token,
)
logging.info('remove session semaphore for %s',login)
logging.info("remove session semaphore for %s", login)
remove_semaphore_session_file(
session_dir=conf["active_session_directory"],
safename=safename_base,
Expand All @@ -272,7 +287,7 @@ def download_list_product_multithread_v2(
else:
df2.loc[(df2["safe"] == safename_base), "status"] = -1
errors_per_account[login] += 1
logging.info('error found for %s meaning %s',login,status_meaning)
logging.info("error found for %s meaning %s", login, status_meaning)
# df2["status"][df2["safe"] == safename_base] = -1 # download in error
cpt["status_%s" % status_meaning] += 1

Expand Down Expand Up @@ -374,7 +389,7 @@ def download_list_product(
access_token,
date_generation_access_token,
specific_account,
path_semphore_token
path_semphore_token,
) = get_bearer_access_token(specific_account=specific_account)
headers = {"Authorization": "Bearer %s" % access_token}
session.headers.update(headers)
Expand Down Expand Up @@ -454,7 +469,7 @@ def download_list_product_sequential(
"""
assert len(list_id) == len(list_safename)
logins_group = 'logins'
logins_group = "logins"
cpt = defaultdict(int)
cpt["total_product_to_download"] = len(list_id)
df = pd.DataFrame(
Expand All @@ -463,7 +478,10 @@ def download_list_product_sequential(
df2, cpt = filter_product_already_present(cpt, df, outputdir)

df_products_downloadable = get_sessions_download_available(
df2, hideProgressBar=hideProgressBar, blacklist=None,logins_group=logins_group,
df2,
hideProgressBar=hideProgressBar,
blacklist=None,
logins_group=logins_group,
)
logging.info("product downloadable: %s", len(df_products_downloadable))
df_products_downloadable["status"] = 0
Expand Down Expand Up @@ -506,8 +524,10 @@ def download_list_product_sequential(
access_token,
date_generation_access_token,
login,
path_semaphore_token
) = get_bearer_access_token(specific_account=None,account_group=logins_group)
path_semaphore_token,
) = get_bearer_access_token(
specific_account=None, account_group=logins_group
)
headers = {"Authorization": "Bearer %s" % access_token}
session.headers.update(headers)
else:
Expand Down
5 changes: 3 additions & 2 deletions cdsodatacli/fetch_access_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ def get_bearer_access_token(quiet=True, specific_account=None, account_group="lo
prefix = "curl -s "
else:
prefix = "curl "
option_insecure = ' --insecure' # added because workers have deprecated SSL certificates
cmd = (
prefix
+ " --location --request POST "
+ url_identity
+ " --header 'Content-Type: application/x-www-form-urlencoded' --data-urlencode 'grant_type=password' --data-urlencode 'username=%s' --data-urlencode 'password=%s' --data-urlencode 'client_id=cdse-public'"
% (login, passwd)
+ " --header 'Content-Type: application/x-www-form-urlencoded' --data-urlencode 'grant_type=password' --data-urlencode 'username=%s' --data-urlencode 'password=%s' --data-urlencode 'client_id=cdse-public' %s"
% (login, passwd,option_insecure)
)

logging.debug("cmd: %s", cmd)
Expand Down

0 comments on commit f017721

Please sign in to comment.