Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle downloads no data available #27

Merged
merged 2 commits into from
Nov 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 90 additions & 34 deletions src/ecmwf_models/era5/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,26 @@
Module to download ERA5 from terminal in netcdf and grib format.
'''

from ecmwf_models.utils import *
from ecmwf_models.utils import (load_var_table,
lookup,
save_gribs_from_grib,
save_ncs_from_nc,
mkdate,
str2bool,
)
import warnings
import errno
import argparse
import sys
import os
import logging
from datetime import datetime, timedelta, time
import shutil
import cdsapi
import calendar
import multiprocessing


def default_variables(product='era5'):
"""
These variables are being downloaded, when None are passed by the user
Expand All @@ -49,6 +59,7 @@ def default_variables(product='era5'):
defaults = lut.loc[lut['default'] == 1]['dl_name'].values
return defaults.tolist()


def download_era5(c, years, months, days, h_steps, variables, target, grb=False,
product='era5', dry_run=False, cds_kwds={}):
'''
Expand Down Expand Up @@ -115,11 +126,32 @@ def download_era5(c, years, months, days, h_steps, variables, target, grb=False,
return True


class CDSStatusTracker():
"""
Track the status of the CDS download by using the CDS callback functions
"""
statuscode_ok = 0
statuscode_error = -1
statuscode_unavailable = 10

def __init__(self):
self.download_statuscode = self.statuscode_ok

def handle_error_function(self, *args, **kwargs):
message_prefix = args[0]
message_body = args[1]
if self.download_statuscode != self.statuscode_unavailable:
if message_prefix.startswith('Reason:') and message_body == 'Request returned no data':
self.download_statuscode = self.statuscode_unavailable
else:
self.download_statuscode = self.statuscode_error
logging.error(*args, **kwargs)


def download_and_move(target_path, startdate, enddate, product='era5',
variables=None, keep_original=False, h_steps=[0, 6, 12, 18],
grb=False, dry_run=False, grid=None, remap_method="bil",
cds_kwds={}, stepsize="month"):
cds_kwds={}, stepsize="month") -> int:
"""
Downloads the data from the ECMWF servers and moves them to the target path.
This is done in 30 day increments between start and end date.
Expand Down Expand Up @@ -175,6 +207,13 @@ def download_and_move(target_path, startdate, enddate, product='era5',
Additional arguments to be passed to the CDS API retrieve request.
stepsize : str, optional
Size of steps for requests, can be "month" or "day".

Returns
-------
status_code: int
0 : Downloaded data ok
-1 : Error
-10 : No data available for requested time period
"""
product = product.lower()

Expand All @@ -191,11 +230,12 @@ def download_and_move(target_path, startdate, enddate, product='era5',
warnings.warn('Dry run does not create connection to CDS')
c = None
else:
c = cdsapi.Client()

cds_status_tracker = CDSStatusTracker()
c = cdsapi.Client(error_callback=cds_status_tracker.handle_error_function)

pool = multiprocessing.Pool(1)
while curr_start <= enddate:
status_code = -1
sy, sm, sd = curr_start.year, curr_start.month, curr_start.day
y, m = sy, sm
if stepsize == "month":
Expand Down Expand Up @@ -228,39 +268,46 @@ def download_and_move(target_path, startdate, enddate, product='era5',
h_steps=h_steps, variables=variables, grb=grb,
product=product, target=dl_file, dry_run=dry_run,
cds_kwds=cds_kwds)
status_code = 0
break

except:
# If no data is available we don't need to retry
if cds_status_tracker.download_statuscode == CDSStatusTracker.statuscode_unavailable:
status_code = -10
break

# delete the partly downloaded data and retry
if os.path.isfile(dl_file):
os.remove(dl_file)
finished = False
i += 1
continue

if grb:
pool.apply_async(
save_gribs_from_grib,
args=(dl_file, target_path),
kwds=dict(
product_name=product.upper(),
keep_original=keep_original
if status_code == 0:
if grb:
pool.apply_async(
save_gribs_from_grib,
args=(dl_file, target_path),
kwds=dict(
product_name=product.upper(),
keep_original=keep_original
)
)
)
else:
pool.apply_async(
save_ncs_from_nc,
args=(
dl_file,
target_path,
),
kwds=dict(
product_name=product.upper(),
grid=grid,
remap_method=remap_method,
keep_original=keep_original
else:
pool.apply_async(
save_ncs_from_nc,
args=(
dl_file,
target_path,
),
kwds=dict(
product_name=product.upper(),
grid=grid,
remap_method=remap_method,
keep_original=keep_original
)
)
)

curr_start = curr_end + timedelta(days=1)
pool.close()
Expand All @@ -277,6 +324,8 @@ def download_and_move(target_path, startdate, enddate, product='era5',
if os.path.exists(weightspath):
os.unlink(weightspath)

return status_code


def parse_args(args):
"""
Expand Down Expand Up @@ -338,15 +387,22 @@ def parse_args(args):

def main(args):
args = parse_args(args)
download_and_move(target_path=args.localroot,
startdate=args.start,
enddate=args.end,
product=args.product,
variables=args.variables,
h_steps=args.h_steps,
grb=args.as_grib,
keep_original=args.keep_original)
status_code = download_and_move(target_path=args.localroot,
startdate=args.start,
enddate=args.end,
product=args.product,
variables=args.variables,
h_steps=args.h_steps,
grb=args.as_grib,
keep_original=args.keep_original)
return status_code


def run():
main(sys.argv[1:])
status_code = main(sys.argv[1:])
if status_code == -10:
return_code = errno.ENODATA # Default no data status code of 61
else:
return_code = status_code

sys.exit(return_code)