Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

save in one file #791

Merged
merged 8 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/troute-config/troute/config/output_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class OutputParameters(BaseModel):
# NOTE: assuming this should be removed
# TODO: missing from `v3_doc.yaml`
# see nwm_routing/output.py :114
test_output: Optional[FilePath] = None
test_output: Optional[Path] = None
stream_output: Optional["StreamOutput"] = None
# NOTE: mandatory if writing results to lastobs
lastobs_output: Optional[DirectoryPath] = None
Expand Down Expand Up @@ -90,7 +90,7 @@ def validate_stream_output_internal_frequency(cls, value, values):
if value is not None:
if value % 5 != 0:
raise ValueError("stream_output_internal_frequency must be a multiple of 5.")
if value / 60 > values['stream_output_time']:
if values.get('stream_output_time') != -1 and value / 60 > values['stream_output_time']:
raise ValueError("stream_output_internal_frequency should be less than or equal to stream_output_time in minutes.")
return value

Expand Down
7 changes: 5 additions & 2 deletions src/troute-network/troute/AbstractNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ def build_forcing_sets(self,):

forcing_parameters = self.forcing_parameters
supernetwork_parameters = self.supernetwork_parameters

stream_output = self.output_parameters.get('stream_output', None)
run_sets = forcing_parameters.get("qlat_forcing_sets", None)
qlat_input_folder = forcing_parameters.get("qlat_input_folder", None)
nts = forcing_parameters.get("nts", None)
Expand Down Expand Up @@ -830,7 +830,10 @@ def build_forcing_sets(self,):

# the number of files required for the simulation
nfiles = int(np.ceil(nts / qts_subdivisions))

if stream_output:
stream_output_time = stream_output.get('stream_output_time', None)
if stream_output_time and stream_output_time > max_loop_size:
max_loop_size = stream_output_time
# list of forcing file datetimes
#datetime_list = [t0 + dt_qlat_timedelta * (n + 1) for n in
# range(nfiles)]
Expand Down
23 changes: 12 additions & 11 deletions src/troute-network/troute/DataAssimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,16 +587,17 @@ def update_for_next_loop(self, network, da_run,):
- reservoir_usgs_df (DataFrame): USGS reservoir observations
- reservoir_usace_df (DataFrame): USACE reservoir observations
'''
LOG.debug(' Update for next loop started')
data_assimilation_parameters = self._data_assimilation_parameters
run_parameters = self._run_parameters

# update usgs_df if it is not empty
streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None)
reservoir_da_parameters = data_assimilation_parameters.get('reservoir_da', None)

if not self.usgs_df.empty:

if reservoir_da_parameters.get('reservoir_persistence_usgs', False):
if not self._usgs_df.empty:
if reservoir_da_parameters.get('reservoir_persistence_da').get('reservoir_persistence_usgs', False):

gage_lake_df = (
network.usgs_lake_gage_crosswalk.
Expand All @@ -621,7 +622,7 @@ def update_for_next_loop(self, network, da_run,):

# resample `usgs_df` to 15 minute intervals
usgs_df_15min = (
self.usgs_df.
self._usgs_df.
transpose().
resample('15min').asfreq().
transpose()
Expand All @@ -632,14 +633,14 @@ def update_for_next_loop(self, network, da_run,):
usgs_df_15min.join(link_lake_df, how = 'inner').
reset_index().
set_index('usgs_lake_id').
drop(['index'], axis = 1)
drop(['link'], axis = 1)
)

# replace link ids with lake ids, for gages at waterbody outlets,
# otherwise, gage data will not be assimilated at waterbody outlet
# segments.
if network.link_lake_crosswalk:
usgs_df = _reindex_link_to_lake_id(usgs_df, network.link_lake_crosswalk)
self._usgs_df = _reindex_link_to_lake_id(self._usgs_df, network.link_lake_crosswalk)

elif reservoir_da_parameters.get('reservoir_persistence_usgs', False):
(
Expand All @@ -656,7 +657,7 @@ def update_for_next_loop(self, network, da_run,):
res_source = 'usgs')

# USACE
if reservoir_da_parameters.get('reservoir_persistence_usace', False):
if reservoir_da_parameters.get('reservoir_persistence_da').get('reservoir_persistence_usace', False):

(
self._reservoir_usace_df,
Expand Down Expand Up @@ -697,8 +698,8 @@ def update_for_next_loop(self, network, da_run,):
# what happens if there are timeslice files missing on the front-end?
# if the first column is some timestamp greater than t0, then this will throw
# an error. Need to think through this more.
if not self.usgs_df.empty:
self._usgs_df = self.usgs_df.loc[:,network.t0:]
if not self._usgs_df.empty:
self._usgs_df = self._usgs_df.loc[:,network.t0:]

class great_lake(AbstractDA):
'''
Expand Down Expand Up @@ -1076,7 +1077,7 @@ def _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_
).
loc[network.link_gage_df.index]
)

else:
usgs_df = pd.DataFrame()
LOG.debug("Reading and preprocessing usgs timeslice files is completed in %s seconds." % (time.time() - usgs_df_start_time))
Expand Down
4 changes: 3 additions & 1 deletion src/troute-network/troute/NHDNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def __init__(
forcing_parameters,
compute_parameters,
data_assimilation_parameters,
hybrid_parameters,
hybrid_parameters,
output_parameters,
verbose=False,
showtiming=False,
):
Expand All @@ -41,6 +42,7 @@ def __init__(
self.compute_parameters = compute_parameters
self.forcing_parameters = forcing_parameters
self.hybrid_parameters = hybrid_parameters
self.output_parameters = output_parameters
self.verbose = verbose
self.showtiming = showtiming

Expand Down
72 changes: 47 additions & 25 deletions src/troute-network/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2013,7 +2013,7 @@ def write_flowveldepth_netcdf(stream_output_directory, file_name,
# =========== time VARIABLE ===============
TIME = ncfile.createVariable(
varname = "time",
datatype = 'int32',
datatype = 'float64',
dimensions = ("time",),
fill_value = -9999.0
)
Expand All @@ -2022,7 +2022,7 @@ def write_flowveldepth_netcdf(stream_output_directory, file_name,
{
'long_name': 'valid output time',
'standard_name': 'time',
'units': 'seconds',
'units': f'seconds since {t0.strftime("%Y-%m-%d %H:%M:%S")}',
'missing_value': -9999.0
#'calendar': 'proleptic_gregorian'
}
Expand Down Expand Up @@ -2169,42 +2169,64 @@ def write_flowveldepth(
empty_ids = list(set(flowveldepth.index).difference(set(nudge_df.index)))
empty_df = pd.DataFrame(index=empty_ids, columns=nudge_df.columns).fillna(-9999.0)
nudge_df = pd.concat([nudge_df, empty_df]).loc[flowveldepth.index]

ts_per_file = stream_output_timediff*60//stream_output_internal_frequency

num_files = flowveldepth.shape[1]//3*dt//(stream_output_timediff*60*60)
if num_files==0:
num_files=1

file_name_time = t0
jobs = []
for _ in range(num_files):

if stream_output_timediff > 0:
ts_per_file = stream_output_timediff*60//stream_output_internal_frequency

num_files = flowveldepth.shape[1]//3*dt//(stream_output_timediff*60*60)
if num_files==0:
num_files=1

for _ in range(num_files):
filename = 'troute_output_' + file_name_time.strftime('%Y%m%d%H%M') + stream_output_type
args = (stream_output_directory,filename,
flow.iloc[:,0:ts_per_file],
velocity.iloc[:,0:ts_per_file],
depth.iloc[:,0:ts_per_file],
nudge_df.iloc[:,0:ts_per_file],
timestamps_sec[0:ts_per_file],t0)
if stream_output_type == '.nc':
if cpu_pool > 1 & num_files > 1:
jobs.append(delayed(write_flowveldepth_netcdf)(*args))
else:
write_flowveldepth_netcdf(*args)
else:
if cpu_pool > 1 & num_files > 1:
jobs.append(delayed(write_flowveldepth_csv_pkl)(*args))
else:
write_flowveldepth_csv_pkl(*args)

flow = flow.iloc[:,ts_per_file:]
velocity = velocity.iloc[:,ts_per_file:]
depth = depth.iloc[:,ts_per_file:]
nudge_df = nudge_df.iloc[:,ts_per_file:]
timestamps_sec = timestamps_sec[ts_per_file:]
file_name_time = file_name_time + timedelta(hours=stream_output_timediff)

elif stream_output_timediff == -1:

filename = 'troute_output_' + file_name_time.strftime('%Y%m%d%H%M') + stream_output_type
args = (stream_output_directory,filename,
flow.iloc[:,0:ts_per_file],
velocity.iloc[:,0:ts_per_file],
depth.iloc[:,0:ts_per_file],
nudge_df.iloc[:,0:ts_per_file],
timestamps_sec[0:ts_per_file],t0)
flow,
velocity,
depth,
nudge_df,
timestamps_sec,
t0)
if stream_output_type == '.nc':
if cpu_pool > 1 & num_files > 1:
if cpu_pool > 1:
jobs.append(delayed(write_flowveldepth_netcdf)(*args))
else:
write_flowveldepth_netcdf(*args)
else:
if cpu_pool > 1 & num_files > 1:
if cpu_pool > 1:
jobs.append(delayed(write_flowveldepth_csv_pkl)(*args))
else:
write_flowveldepth_csv_pkl(*args)

flow = flow.iloc[:,ts_per_file:]
velocity = velocity.iloc[:,ts_per_file:]
depth = depth.iloc[:,ts_per_file:]
nudge_df = nudge_df.iloc[:,ts_per_file:]
timestamps_sec = timestamps_sec[ts_per_file:]
file_name_time = file_name_time + timedelta(hours=stream_output_timediff)

if cpu_pool > 1 & num_files > 1:
if cpu_pool > 1:
try:
# Execute all jobs in parallel
with Parallel(n_jobs=cpu_pool) as parallel:
Expand Down
1 change: 1 addition & 0 deletions src/troute-nwm/src/nwm_routing/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def main_v04(argv):
compute_parameters,
data_assimilation_parameters,
hybrid_parameters,
output_parameters,
verbose=True,
showtiming=showtiming,
)
Expand Down
4 changes: 2 additions & 2 deletions src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ def nwm_output_generator(
LOG.debug("writing lastobs files took %s seconds." % (time.time() - start))


if 'flowveldepth' in locals():
LOG.debug(flowveldepth)
# if 'flowveldepth' in locals():
# LOG.debug(flowveldepth)

LOG.debug("output complete in %s seconds." % (time.time() - start_time))

Expand Down
10 changes: 5 additions & 5 deletions test/LowerColorado_TX_v4/test_AnA_V4_HYFeature.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ compute_parameters:
#--------------------------------------------------------------------------------
# output_parameters:
# #----------
# test_output : output/lcr_flowveldepth.pkl
# lite_restart:
# test_output : output/lcr_flowveldepth.pkl
# lite_restart:
# #----------
# lite_restart_output_directory: restart/
# lakeout_output: lakeout/
# lastobs_output: lastobs/
# lakeout_output: lakeout/
# lastobs_output: lastobs/
# stream_output :
# stream_output_directory: output/
# stream_output_time: 1 #[hr]
# stream_output_time: 1 #[hr] ** Consider `stream_output_time = -1` means save everything in one file **
# stream_output_type: '.nc' #please select only between netcdf '.nc' or '.csv' or '.pkl'
# stream_output_internal_frequency: 60 #[min] it should be order of 5 minutes. For instance if you want to output every hour put 60

Loading