Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix on download #47

Merged
merged 2 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions cdsodatacli/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil
import random
import pandas as pd
from requests.exceptions import ChunkedEncodingError
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np
import traceback
Expand All @@ -26,7 +27,8 @@
from cdsodatacli.utils import conf, test_safe_archive, test_safe_spool
from collections import defaultdict

chunksize = 4096
# chunksize = 4096
chunksize = 8192 # like in the CDSE example

# def CDS_Odata_download_one_product(session, headers, url, output_filepath):
# """
Expand Down Expand Up @@ -94,23 +96,36 @@ def CDS_Odata_download_one_product_v2(
# output_filepath_tmp = (
# output_filepath.replace(conf["spool"], conf["pre_spool"]) + ".tmp"
# )
output_filepath_tmp = os.path.join(conf["pre_spool"],os.path.basename(output_filepath)+ ".tmp")
output_filepath_tmp = os.path.join(
conf["pre_spool"], os.path.basename(output_filepath) + ".tmp"
)
safename_base = os.path.basename(output_filepath).replace(".zip", "")
with open(output_filepath_tmp, "wb") as f:
logging.debug("Downloading %s" % output_filepath)
response = session.get(url, headers=headers, stream=True)
status = response.status_code
status_meaning = response.reason
# Check for 'Transfer-Encoding: chunked'
if (
"Transfer-Encoding" in response.headers
and response.headers["Transfer-Encoding"] == "chunked"
):
logging.warning(
"Server is using 'Transfer-Encoding: chunked'. Content length may not be accurate."
)
if response.ok:
total_length = int(
int(response.headers.get("content-length")) / 1000 / 1000
)
logging.debug("total_length : %s Mo", total_length)

for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
if not response.ok and os.path.exists(output_filepath_tmp):
try:
for chunk in response.iter_content(chunk_size=chunksize):
if chunk:
f.write(chunk)
except ChunkedEncodingError as e:
status = -1
status_meaning = "ChunkedEncodingError"
if (not response.ok or status == -1) and os.path.exists(output_filepath_tmp):
logging.debug("remove empty file %s", output_filepath_tmp)
os.remove(output_filepath_tmp)
elapsed_time = time.time() - t0
Expand Down Expand Up @@ -250,7 +265,7 @@ def download_list_product_multithread_v2(
login=login,
date_generation_access_token=date_generation_access_token,
)
logging.info('remove session semaphore for %s',login)
logging.info("remove session semaphore for %s", login)
remove_semaphore_session_file(
session_dir=conf["active_session_directory"],
safename=safename_base,
Expand All @@ -272,7 +287,7 @@ def download_list_product_multithread_v2(
else:
df2.loc[(df2["safe"] == safename_base), "status"] = -1
errors_per_account[login] += 1
logging.info('error found for %s meaning %s',login,status_meaning)
logging.info("error found for %s meaning %s", login, status_meaning)
# df2["status"][df2["safe"] == safename_base] = -1 # download in error
cpt["status_%s" % status_meaning] += 1

Expand Down Expand Up @@ -374,7 +389,7 @@ def download_list_product(
access_token,
date_generation_access_token,
specific_account,
path_semphore_token
path_semphore_token,
) = get_bearer_access_token(specific_account=specific_account)
headers = {"Authorization": "Bearer %s" % access_token}
session.headers.update(headers)
Expand Down Expand Up @@ -454,7 +469,7 @@ def download_list_product_sequential(

"""
assert len(list_id) == len(list_safename)
logins_group = 'logins'
logins_group = "logins"
cpt = defaultdict(int)
cpt["total_product_to_download"] = len(list_id)
df = pd.DataFrame(
Expand All @@ -463,7 +478,10 @@ def download_list_product_sequential(
df2, cpt = filter_product_already_present(cpt, df, outputdir)

df_products_downloadable = get_sessions_download_available(
df2, hideProgressBar=hideProgressBar, blacklist=None,logins_group=logins_group,
df2,
hideProgressBar=hideProgressBar,
blacklist=None,
logins_group=logins_group,
)
logging.info("product downloadable: %s", len(df_products_downloadable))
df_products_downloadable["status"] = 0
Expand Down Expand Up @@ -506,8 +524,10 @@ def download_list_product_sequential(
access_token,
date_generation_access_token,
login,
path_semaphore_token
) = get_bearer_access_token(specific_account=None,account_group=logins_group)
path_semaphore_token,
) = get_bearer_access_token(
specific_account=None, account_group=logins_group
)
headers = {"Authorization": "Bearer %s" % access_token}
session.headers.update(headers)
else:
Expand Down
5 changes: 3 additions & 2 deletions cdsodatacli/fetch_access_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ def get_bearer_access_token(quiet=True, specific_account=None, account_group="lo
prefix = "curl -s "
else:
prefix = "curl "
option_insecure = ' --insecure' # added because workers have deprecated SSL certificates
cmd = (
prefix
+ " --location --request POST "
+ url_identity
+ " --header 'Content-Type: application/x-www-form-urlencoded' --data-urlencode 'grant_type=password' --data-urlencode 'username=%s' --data-urlencode 'password=%s' --data-urlencode 'client_id=cdse-public'"
% (login, passwd)
+ " --header 'Content-Type: application/x-www-form-urlencoded' --data-urlencode 'grant_type=password' --data-urlencode 'username=%s' --data-urlencode 'password=%s' --data-urlencode 'client_id=cdse-public' %s"
% (login, passwd,option_insecure)
)

logging.debug("cmd: %s", cmd)
Expand Down