Skip to content

Commit

Permalink
Merge pull request #60 from umr-lops/improve_download
Browse files Browse the repository at this point in the history
add a module to let user provide safenames only in listing for download
  • Loading branch information
agrouaze authored Dec 19, 2023
2 parents 97c676a + a17973c commit 2ff68a7
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 9 deletions.
80 changes: 76 additions & 4 deletions cdsodatacli/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil
import random
import pandas as pd
import geopandas as gpd
from requests.exceptions import ChunkedEncodingError
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np
Expand All @@ -24,7 +25,9 @@
get_sessions_download_available,
MAX_SESSION_PER_ACCOUNT,
)
from cdsodatacli.query import fetch_data
from cdsodatacli.utils import conf, test_safe_archive, test_safe_spool
from cdsodatacli.product_parser import ExplodeSAFE
from collections import defaultdict

# chunksize = 4096
Expand Down Expand Up @@ -132,13 +135,13 @@ def CDS_Odata_download_one_product_v2(
if status == 200: # means OK download
speed = total_length / elapsed_time
shutil.move(output_filepath_tmp, output_filepath)
os.chmod(output_filepath,mode=0o0775)
os.chmod(output_filepath, mode=0o0775)
logging.debug("time to download this product: %1.1f sec", elapsed_time)
logging.debug("average download speed: %1.1fMo/sec", speed)
return speed, status_meaning, safename_base, semaphore_token_file


def filter_product_already_present(cpt, df, outputdir,force_download=False):
def filter_product_already_present(cpt, df, outputdir, force_download=False):
"""
Parameters
Expand Down Expand Up @@ -187,7 +190,6 @@ def filter_product_already_present(cpt, df, outputdir,force_download=False):
return df_todownload, cpt



def download_list_product_multithread_v2(
list_id,
list_safename,
Expand Down Expand Up @@ -224,7 +226,9 @@ def download_list_product_multithread_v2(
{"safe": list_safename, "status": np.zeros(len(list_safename)), "id": list_id}
)

df2, cpt = filter_product_already_present(cpt, df, outputdir,force_download=check_on_disk==False)
df2, cpt = filter_product_already_present(
cpt, df, outputdir, force_download=check_on_disk == False
)

logging.info("%s", cpt)
while_loop = 0
Expand Down Expand Up @@ -469,6 +473,74 @@ def download_list_product(
)


def test_listing_content(listing_path):
"""
make sure that a lsiting of products to download respect the following format:
cdse-hash-id,safename
Arguments:
---------
listing_path (str):
Returns
-------
"""
fid = open(listing_path)
first_line = fid.readline()
listing_OK = False
if "," in first_line:
if "SAFE" in first_line.split(",")[1] and "S" in first_line.split(",")[1][0]:
listing_OK = True
return listing_OK


def add_missing_cdse_hash_ids_in_listing(listing_path):
"""
Parameters
----------
listing_path (str):
Returns
-------
"""
df_raw = pd.read_csv(listing_path, names=["safenames"])
list_safe_a = df_raw["safenames"].values

delta = datetime.timedelta(seconds=1)
gdf = gpd.GeoDataFrame(
{
# "start_datetime" : [ None ],
# "end_datetime" : [ None ],
"start_datetime": [ExplodeSAFE(jj).startdate - delta for jj in list_safe_a],
"end_datetime": [ExplodeSAFE(jj).enddate - delta for jj in list_safe_a],
# "start_datetime": [
# datetime.datetime.strptime(jj.split("_")[5], "%Y%m%dT%H%M%S") - delta
# for jj in list_safe_a
# ],
# "end_datetime": [
# datetime.datetime.strptime(jj.split("_")[6], "%Y%m%dT%H%M%S") + delta
# for jj in list_safe_a
# ],
"geometry": np.tile([None], len(list_safe_a)),
"collection": np.tile(["SENTINEL-1"], len(list_safe_a)),
"name": list_safe_a,
"sensormode": [ExplodeSAFE(jj).mode for jj in list_safe_a],
"producttype": [ExplodeSAFE(jj).product[0:3] for jj in list_safe_a],
"Attributes": np.tile([None], len(list_safe_a)),
}
)
sea_min_pct = 0
collected_data_norm = fetch_data(gdf, min_sea_percent=sea_min_pct)
if collected_data_norm is None:
res = pd.DataFrame({"id": [], "safename": []})
else:
res = collected_data_norm[["Id", "Name"]]
res.rename(columns={"Name": "safename"},inplace=True)
res.rename(columns={"Id": "id"},inplace=True)
return res


def download_list_product_sequential(
list_id, list_safename, outputdir, hideProgressBar=False
):
Expand Down
117 changes: 117 additions & 0 deletions cdsodatacli/product_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
Author: Antoine.Grouazel@ifremer.fr
Purpose:separate information in SAFE name sentinel1
Creation: 2014-11-28
Arguments: basename SAFE directory
note: valid also for Sentinel3 SRAL data
"""
import sys
import logging
import datetime

fields = [
"satellite",
"mode",
"product",
"level",
"polarisation",
"startdate",
"enddate",
"absolute_orbit_number",
"mission_data_take",
"product_id",
"kind",
]


class ExplodeSAFE(object):
"""input basename_safe (str) SAFE name
only (no parent directories before neitheir children files)"""

def __init__(self, basename_safe):
if "/" in basename_safe:
raise Exception("need basename not full path")
if basename_safe[0:2] == "S1":
self.safename = basename_safe
self.satellite = self.safename[0:3]
self.mode = self.safename[4:6]
self.product = self.safename[7:11]
self.level = self.safename[12]
self.kind = self.safename[13]
self.polarisation = self.safename[14:16]
self.startdate = datetime.datetime.strptime(
self.safename[17:32], "%Y%m%dT%H%M%S"
)
self.enddate = datetime.datetime.strptime(
self.safename[33:48], "%Y%m%dT%H%M%S"
)
self.absolute_orbit_number = self.safename[49:55]
self.duration = (self.enddate - self.startdate).total_seconds()
self.sensor = "CbandRadar"
self.mission_data_take = self.safename[56:62] # datatake id
self.product_id = self.safename[
63:67
] # unique id (processing ID) for a given product id( you can have the same for different product_id)
self.production_status = "operational"
self.cycle_number = None
self.relative_orbit_number = None

elif basename_safe[0:2] == "S3":
self.safename = basename_safe
splitos = self.safename.split("_")
self.satellite = splitos[0]
self.mode = None
self.sensor = splitos[1]
self.product = splitos[3]
self.duration = splitos[10]
self.level = splitos[2]
self.kind = None
# self.kind = self.safename[13]
self.polarisation = None
# print splitos[7]
self.startdate = datetime.datetime.strptime(splitos[7], "%Y%m%dT%H%M%S")
self.enddate = datetime.datetime.strptime(splitos[9], "%Y%m%dT%H%M%S")
self.absolute_orbit_number = None
self.cycle_number = splitos[11]
self.relative_orbit_number = splitos[12]
self.mission_data_take = splitos[9] # product id
self.product_generating_center = splitos[18]
self.product_id = splitos[
10
] # unique id for a given product id( you can have the same for different product_id)
productions_status_code = {
"O": "operational",
"F": "reference",
"D": "development",
"R": "reprocessing",
}
self.production_status = productions_status_code[splitos[19]]
return

def props(self):
return [i for i in self.__dict__.keys() if i[:1] != "_"]

def get(self, info):
# if info in fields:
res = getattr(self, info)
# else:
# logging.error('no field %s in safe name',info)
# res = None
return res


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
if len(sys.argv) > 1:
safe = sys.argv[1]
else:
# safe = 'S1A_WV_SLC__1SSV_20141113T141141_20141113T143623_003264_003C69_1CDB.SAFE'
safe = "S3A_SR_2_WAT____20170124T120058_20170124T121058_20170124T140548_0599_013_294______MAR_O_NR_002.SEN3" # attention fichiers coupe en demi orbit mais une seul numero de cycle
logging.info("%s", safe)
obj = ExplodeSAFE(safe)
print(obj.get("startdate"))
# for ff in fields:
for ff in obj.props():
val = obj.get(ff)
logging.debug("info %s => %s", ff, val)
print("start date=", obj.startdate)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
import logging
import os
import cdsodatacli
from cdsodatacli.download import download_list_product_multithread_v2
from cdsodatacli.download import (
download_list_product_multithread_v2,
test_listing_content,
add_missing_cdse_hash_ids_in_listing
)
from cdsodatacli.utils import conf

# listing = './example_WV_listing.txt'
Expand All @@ -27,8 +31,12 @@

parser = argparse.ArgumentParser(description="highleveltest-fetch_OCN_WV_IDs")
parser.add_argument("--verbose", action="store_true", default=False)
parser.add_argument("--forcedownload", action="store_true", default=False,
help='True -> no test of existence of the products in spool and archive directories.')
parser.add_argument(
"--forcedownload",
action="store_true",
default=False,
help="True -> no test of existence of the products in spool and archive directories.",
)
parser.add_argument(
"--logingroup",
help="name of the group of CDSE account in the localconfig.yml [default=logins]",
Expand Down Expand Up @@ -64,7 +72,10 @@
logins_group = args.logingroup
logging.info("logins_group : %s", len(conf[logins_group]))
outputdir = args.outputdir
inputdf = pd.read_csv(listing, names=["id", "safename"], delimiter=",")
if test_listing_content(listing_path=listing):
inputdf = pd.read_csv(listing, names=["id", "safename"], delimiter=",")
else:
inputdf = add_missing_cdse_hash_ids_in_listing(listing_path=listing)
if not os.path.exists(outputdir):
logging.debug("mkdir on %s", outputdir)
os.makedirs(outputdir, 0o0775)
Expand All @@ -74,6 +85,6 @@
outputdir=outputdir,
hideProgressBar=False,
account_group=logins_group,
check_on_disk=args.forcedownload==False,
check_on_disk=args.forcedownload == False,
)
logging.info("end of function")

0 comments on commit 2ff68a7

Please sign in to comment.