Skip to content

Commit

Permalink
Reformat qlat inputs (#595)
Browse files Browse the repository at this point in the history
* initial commit, updates to convert nexus csvs to hourly binary files

* added raising runtime errors if the directory/files for qlat binary file creation are not correct

* added empty binary_files directory

* only raise error if .parquet files already exist. Other files are ok
  • Loading branch information
shorvath-noaa authored Dec 6, 2022
1 parent 4b17ac7 commit 7571e1e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 9 deletions.
81 changes: 74 additions & 7 deletions src/troute-network/troute/hyfeature_network_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from functools import partial
from datetime import datetime, timedelta
import logging
import os

import pandas as pd
import numpy as np
import netCDF4
from joblib import delayed, Parallel
import pyarrow as pa
import pyarrow.parquet as pq

import troute.nhd_io as nhd_io

Expand Down Expand Up @@ -35,7 +38,27 @@ def build_forcing_sets(
raise AssertionError("Aborting simulation because the nexus_input_folder:", qlat_input_folder,"does not exist. Please check the the nexus_input_folder variable is correctly entered in the .yaml control file") from None

forcing_glob_filter = forcing_parameters.get("nexus_file_pattern_filter", "*.NEXOUT")


if forcing_glob_filter=="nex-*":
print("Reformating qlat nexus files as hourly binary files...")
binary_folder = forcing_parameters.get('binary_nexus_file_folder', None)
nexus_files = nexus_input_folder.glob(forcing_glob_filter)

#Check that directory/files specified will work
if not binary_folder:
raise(RuntimeError("No output binary qlat folder supplied in config"))
elif not os.path.exists(binary_folder):
raise(RuntimeError("Output binary qlat folder supplied in config does not exist"))
elif len(list(pathlib.Path(binary_folder).glob('*.parquet'))) != 0:
raise(RuntimeError("Output binary qlat folder supplied in config is not empty (already contains '.parquet' files)"))

#Add tnx for backwards compatability
nexus_files_list = list(nexus_files) + list(nexus_input_folder.glob('tnx*.csv'))
#Convert files to binary hourly files, reset nexus input information
nexus_input_folder, forcing_glob_filter = nex_files_to_binary(nexus_files_list, binary_folder)
forcing_parameters["nexus_input_folder"] = nexus_input_folder
forcing_parameters["nexus_file_pattern_filter"] = forcing_glob_filter

# TODO: Throw errors if insufficient input data are available
if run_sets:
#FIXME: Change it for hyfeature
Expand All @@ -61,10 +84,10 @@ def build_forcing_sets(

# Deduce the timeinterval of the forcing data from the output timestamps of the first
# two ordered CHRTOUT files
df = pd.read_csv(first_file)
df = read_file(first_file)
t1_str = pd.to_datetime(df.columns[1]).strftime("%Y-%m-%d_%H:%M:%S")
t1 = datetime.strptime(t1_str,"%Y-%m-%d_%H:%M:%S")
df = pd.read_csv(second_file)
df = read_file(second_file)
t2_str = pd.to_datetime(df.columns[1]).strftime("%Y-%m-%d_%H:%M:%S")
t2 = datetime.strptime(t2_str,"%Y-%m-%d_%H:%M:%S")
dt_qlat_timedelta = t2 - t1
Expand All @@ -91,7 +114,7 @@ def build_forcing_sets(
datetime_list]

# list of forcing files
forcing_filename_list = [d_str + "NEXOUT.csv" for d_str in
forcing_filename_list = [d_str + forcing_glob_filter[1:] for d_str in
datetime_list_str]

# check that all forcing files exist
Expand Down Expand Up @@ -128,7 +151,7 @@ def build_forcing_sets(
][-1])
#final_timestamp_str = nhd_io.get_param_str(final_nexout,
# 'model_output_valid_time')
df = pd.read_csv(final_nexout)
df = read_file(final_nexout)
final_timestamp_str = pd.to_datetime(df.columns[1]).strftime("%Y-%m-%d_%H:%M:%S")

run_sets[j]['final_timestamp'] = \
Expand Down Expand Up @@ -200,8 +223,9 @@ def build_qlateral_array(
'''
dfs=[]
for f in nexus_files:
df = pd.read_csv(f).set_index(['feature_id'])
df = read_file(f).set_index(['feature_id'])
dfs.append(df)

# lateral flows [m^3/s] are stored at NEXUS points with NEXUS ids
nexuses_lateralflows_df = pd.concat(dfs, axis=1)

Expand Down Expand Up @@ -230,4 +254,47 @@ def build_qlateral_array(
if not segment_index.empty:
qlats_df = qlats_df[qlats_df.index.isin(segment_index)]

return qlats_df
return qlats_df

def nex_files_to_binary(nexus_files, binary_folder):
for f in nexus_files:
# read the csv file
df = pd.read_csv(f, usecols=[1,2], names=['Datetime','qlat'])

# convert and reformat datetime column
df['Datetime']= pd.to_datetime(df['Datetime']).dt.strftime("%Y%m%d%H%M")

# reformat the dataframe
df['feature_id'] = get_id_from_filename(f)
df = df.pivot(index="feature_id", columns="Datetime", values="qlat")
df.columns.name = None

for col in df.columns:
table_new = pa.Table.from_pandas(df.loc[:, [col]])

if not os.path.exists(f'{binary_folder}/{col}NEXOUT.parquet'):
pq.write_table(table_new, f'{binary_folder}/{col}NEXOUT.parquet')

else:
table_old = pq.read_table(f'{binary_folder}/{col}NEXOUT.parquet')
table = pa.concat_tables([table_old,table_new])
pq.write_table(table, f'{binary_folder}/{col}NEXOUT.parquet')

nexus_input_folder = binary_folder
forcing_glob_filter = '*NEXOUT.parquet'

return nexus_input_folder, forcing_glob_filter

def get_id_from_filename(file_name):
id = os.path.splitext(file_name)[0].split('-')[1].split('_')[0]
return int(id)

def read_file(file_name):
extension = file_name.suffix
if extension=='.csv':
df = pd.read_csv(file_name)
elif extension=='.parquet':
df = pq.read_table(file_name).to_pandas().reset_index()
df.index.name = None

return df
Empty file.
5 changes: 3 additions & 2 deletions test/unit_test_hyfeature/unittest_hyfeature.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ compute_parameters:
dt : 300 # [sec]
qlat_input_folder : channel_forcing/
qlat_file_pattern_filter : "*.CHRTOUT_DOMAIN1"
nexus_input_folder : channel_forcing/
nexus_file_pattern_filter : "*NEXOUT.csv"
nexus_input_folder : channel_forcing/
nexus_file_pattern_filter : "*NEXOUT.csv" #OR "*NEXOUT.parquet" OR "nex-*"
binary_nexus_file_folder : binary_files # this is required if nexus_file_pattern_filter="nex-*"
coastal_boundary_input_file : channel_forcing/schout_1.nc
nts : 48 #288 for 1day
max_loop_size : 2 # [hr]
Expand Down

0 comments on commit 7571e1e

Please sign in to comment.