From 7571e1ea7af5898bb17e973ff0b0273cb9022972 Mon Sep 17 00:00:00 2001 From: shorvath-noaa <103054653+shorvath-noaa@users.noreply.github.com> Date: Tue, 6 Dec 2022 10:00:25 -0700 Subject: [PATCH] Reformat qlat inputs (#595) * initial commit, updates to convert nexus csvs to hourly binary files * added raising runtime errors if the directory/files for qlat binary file creation are not correct * added empty binary_files directory * only raise error if .parquet files already exist. Other files are ok --- .../troute/hyfeature_network_utilities.py | 81 +++++++++++++++++-- .../channel_forcing/binary_files/.gitkeep | 0 .../unittest_hyfeature.yaml | 5 +- 3 files changed, 77 insertions(+), 9 deletions(-) create mode 100644 test/unit_test_hyfeature/channel_forcing/binary_files/.gitkeep diff --git a/src/troute-network/troute/hyfeature_network_utilities.py b/src/troute-network/troute/hyfeature_network_utilities.py index a3c603d8c..e1fc65860 100644 --- a/src/troute-network/troute/hyfeature_network_utilities.py +++ b/src/troute-network/troute/hyfeature_network_utilities.py @@ -3,11 +3,14 @@ from functools import partial from datetime import datetime, timedelta import logging +import os import pandas as pd import numpy as np import netCDF4 from joblib import delayed, Parallel +import pyarrow as pa +import pyarrow.parquet as pq import troute.nhd_io as nhd_io @@ -35,7 +38,27 @@ def build_forcing_sets( raise AssertionError("Aborting simulation because the nexus_input_folder:", qlat_input_folder,"does not exist. Please check the the nexus_input_folder variable is correctly entered in the .yaml control file") from None forcing_glob_filter = forcing_parameters.get("nexus_file_pattern_filter", "*.NEXOUT") - + + if forcing_glob_filter=="nex-*": + print("Reformating qlat nexus files as hourly binary files...") + binary_folder = forcing_parameters.get('binary_nexus_file_folder', None) + nexus_files = nexus_input_folder.glob(forcing_glob_filter) + + #Check that directory/files specified will work + if not binary_folder: + raise(RuntimeError("No output binary qlat folder supplied in config")) + elif not os.path.exists(binary_folder): + raise(RuntimeError("Output binary qlat folder supplied in config does not exist")) + elif len(list(pathlib.Path(binary_folder).glob('*.parquet'))) != 0: + raise(RuntimeError("Output binary qlat folder supplied in config is not empty (already contains '.parquet' files)")) + + #Add tnx for backwards compatability + nexus_files_list = list(nexus_files) + list(nexus_input_folder.glob('tnx*.csv')) + #Convert files to binary hourly files, reset nexus input information + nexus_input_folder, forcing_glob_filter = nex_files_to_binary(nexus_files_list, binary_folder) + forcing_parameters["nexus_input_folder"] = nexus_input_folder + forcing_parameters["nexus_file_pattern_filter"] = forcing_glob_filter + # TODO: Throw errors if insufficient input data are available if run_sets: #FIXME: Change it for hyfeature @@ -61,10 +84,10 @@ def build_forcing_sets( # Deduce the timeinterval of the forcing data from the output timestamps of the first # two ordered CHRTOUT files - df = pd.read_csv(first_file) + df = read_file(first_file) t1_str = pd.to_datetime(df.columns[1]).strftime("%Y-%m-%d_%H:%M:%S") t1 = datetime.strptime(t1_str,"%Y-%m-%d_%H:%M:%S") - df = pd.read_csv(second_file) + df = read_file(second_file) t2_str = pd.to_datetime(df.columns[1]).strftime("%Y-%m-%d_%H:%M:%S") t2 = datetime.strptime(t2_str,"%Y-%m-%d_%H:%M:%S") dt_qlat_timedelta = t2 - t1 @@ -91,7 +114,7 @@ def build_forcing_sets( datetime_list] # list of forcing files - forcing_filename_list = [d_str + "NEXOUT.csv" for d_str in + forcing_filename_list = [d_str + forcing_glob_filter[1:] for d_str in datetime_list_str] # check that all forcing files exist @@ -128,7 +151,7 @@ def build_forcing_sets( ][-1]) #final_timestamp_str = nhd_io.get_param_str(final_nexout, # 'model_output_valid_time') - df = pd.read_csv(final_nexout) + df = read_file(final_nexout) final_timestamp_str = pd.to_datetime(df.columns[1]).strftime("%Y-%m-%d_%H:%M:%S") run_sets[j]['final_timestamp'] = \ @@ -200,8 +223,9 @@ def build_qlateral_array( ''' dfs=[] for f in nexus_files: - df = pd.read_csv(f).set_index(['feature_id']) + df = read_file(f).set_index(['feature_id']) dfs.append(df) + # lateral flows [m^3/s] are stored at NEXUS points with NEXUS ids nexuses_lateralflows_df = pd.concat(dfs, axis=1) @@ -230,4 +254,47 @@ def build_qlateral_array( if not segment_index.empty: qlats_df = qlats_df[qlats_df.index.isin(segment_index)] - return qlats_df \ No newline at end of file + return qlats_df + +def nex_files_to_binary(nexus_files, binary_folder): + for f in nexus_files: + # read the csv file + df = pd.read_csv(f, usecols=[1,2], names=['Datetime','qlat']) + + # convert and reformat datetime column + df['Datetime']= pd.to_datetime(df['Datetime']).dt.strftime("%Y%m%d%H%M") + + # reformat the dataframe + df['feature_id'] = get_id_from_filename(f) + df = df.pivot(index="feature_id", columns="Datetime", values="qlat") + df.columns.name = None + + for col in df.columns: + table_new = pa.Table.from_pandas(df.loc[:, [col]]) + + if not os.path.exists(f'{binary_folder}/{col}NEXOUT.parquet'): + pq.write_table(table_new, f'{binary_folder}/{col}NEXOUT.parquet') + + else: + table_old = pq.read_table(f'{binary_folder}/{col}NEXOUT.parquet') + table = pa.concat_tables([table_old,table_new]) + pq.write_table(table, f'{binary_folder}/{col}NEXOUT.parquet') + + nexus_input_folder = binary_folder + forcing_glob_filter = '*NEXOUT.parquet' + + return nexus_input_folder, forcing_glob_filter + +def get_id_from_filename(file_name): + id = os.path.splitext(file_name)[0].split('-')[1].split('_')[0] + return int(id) + +def read_file(file_name): + extension = file_name.suffix + if extension=='.csv': + df = pd.read_csv(file_name) + elif extension=='.parquet': + df = pq.read_table(file_name).to_pandas().reset_index() + df.index.name = None + + return df \ No newline at end of file diff --git a/test/unit_test_hyfeature/channel_forcing/binary_files/.gitkeep b/test/unit_test_hyfeature/channel_forcing/binary_files/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/test/unit_test_hyfeature/unittest_hyfeature.yaml b/test/unit_test_hyfeature/unittest_hyfeature.yaml index a74b827be..a04032d8c 100644 --- a/test/unit_test_hyfeature/unittest_hyfeature.yaml +++ b/test/unit_test_hyfeature/unittest_hyfeature.yaml @@ -61,8 +61,9 @@ compute_parameters: dt : 300 # [sec] qlat_input_folder : channel_forcing/ qlat_file_pattern_filter : "*.CHRTOUT_DOMAIN1" - nexus_input_folder : channel_forcing/ - nexus_file_pattern_filter : "*NEXOUT.csv" + nexus_input_folder : channel_forcing/ + nexus_file_pattern_filter : "*NEXOUT.csv" #OR "*NEXOUT.parquet" OR "nex-*" + binary_nexus_file_folder : binary_files # this is required if nexus_file_pattern_filter="nex-*" coastal_boundary_input_file : channel_forcing/schout_1.nc nts : 48 #288 for 1day max_loop_size : 2 # [hr]