Skip to content

Commit

Permalink
added animation table generation and era tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
msouff committed May 27, 2020
1 parent a96c708 commit 04705cd
Show file tree
Hide file tree
Showing 15 changed files with 494 additions and 43 deletions.
56 changes: 56 additions & 0 deletions ecflow/return_periods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
################################################################
#
# File: return_periods_to_csv.py
# Author(s): Michael Souffront and Spencer McDonald
# Date: 01/25/2018
# Purpose: Extracts return periods from ECMWF-RAPID era_interim
# results into a csv file
# Requirements: netCDF4, pandas
#
################################################################


import netCDF4 as nc
import os
import pandas as pd


# creates csv file with return periods
def get_return_periods_as_csv(input_dir, output_dir, watershed_name):
# opens netcdf file with return periods
ncfile = nc.Dataset(os.path.join(input_dir, 'return_periods_erai_t511_24hr_19800101to20141231.nc'),'r')

# extract values
comid = ncfile.variables['rivid'][:]
max_flow = ncfile.variables['max_flow'][:]
return20 = ncfile.variables['return_period_20'][:]
return10 = ncfile.variables['return_period_10'][:]
return2 = ncfile.variables['return_period_2'][:]

# creates panda series from values
max_table = pd.Series(max_flow, index=comid)
return2_table = pd.Series(return2, index=comid)
return10_table = pd.Series(return10, index=comid)
return20_table = pd.Series(return20, index=comid)

# creates dataframe
df = pd.DataFrame([return2_table, return10_table, return20_table, max_table])
df = df.transpose()

# creates filename and header
filename = '-'.join([watershed_name, 'return_periods.csv'])
header = ['return_2', 'return_10', 'return_20', 'return_max']

# exports dataframe as csv
df.to_csv(os.path.join(output_dir, filename), index=True, index_label='comid', header=header)

return 'Success'


# runs function on file execution
if __name__ == "__main__":
get_return_periods_as_csv(
input_dir='/home/michael/host_share/era_data',
output_dir='/home/michael/host_share/rapid-io_init/output/dominican_republic-national',
watershed_name='dominican_republic-national'
)
8 changes: 4 additions & 4 deletions ecflow/run_ecflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
Created on Mon Jan 28 11:43:42 2019
@author: michael
@author: Michael Souffront
"""

import sys
Expand Down Expand Up @@ -41,13 +41,13 @@ def __exit__(self, *args):
forecast_date_timestep = params[1]
watershed = params[2]
subbasin = params[3]
rapid_executable_location = '/home/michael/rapid/run/rapid'
rapid_executable_location = str(sys.argv[3])
initialize_flows = params[4]
job_name = params[5]
master_rapid_outflow_file = params[6]
rapid_input_directory = params[7]
mp_execute_directory = '/home/michael/execute'
subprocess_forecast_log_dir = '/home/michael/subprocess_logs'
mp_execute_directory = str(sys.argv[4])
subprocess_forecast_log_dir = str(sys.argv[5])
watershed_job_index = int(params[8].replace('\n', ''))

with CaptureStdOutToLog(os.path.join(subprocess_forecast_log_dir, "{0}.log".format(job_name))):
Expand Down
9 changes: 9 additions & 0 deletions ecflow/run_rapid.def
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ suite run_rapid
family ensemble_family
trigger prep_task == complete
edit PYSCRIPT '/home/michael/host_share/rapid_run/ecflow/run_ecflow.py'
edit RAPID_EXEC '/home/michael/rapid/run/rapid'
edit EXEC_DIR '/home/michael/execute'
edit SUBPROCESS_DIR '/home/michael/subprocess_logs'
task ens_member_52
edit JOB_INDEX '0'
task ens_member_51
Expand Down Expand Up @@ -166,5 +169,11 @@ suite run_rapid
trigger ens_member_2 == complete
edit JOB_INDEX '51'
endfamily
task plain_table_task
trigger ensemble_family == complete
edit PYSCRIPT '/home/michael/host_share/rapid_run/ecflow/spt_extract_plain_table.py'
edit OUT_LOCATION '/home/michael/host_share/rapid-io_init/output'
edit LOG_FILE '/home/michael/host_share/rapid_run/ecflow/run_rapid/ecf_out/plain_table.log'
edit NCES_EXEC '/home/michael/miniconda3/envs/ecflow/bin/nces'
endsuite
# enddef
14 changes: 12 additions & 2 deletions ecflow/run_rapid.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@


def create_ensemble_family():
ensemble_family = Family("ensemble_family" ).add_trigger("prep_task == complete")
ensemble_family = Family("ensemble_family").add_trigger("prep_task == complete")
ensemble_family.add_variable("PYSCRIPT", os.path.join(home, 'run_ecflow.py'))
ensemble_family.add_variable("RAPID_EXEC", '/home/michael/rapid/run/rapid')
ensemble_family.add_variable("EXEC_DIR", '/home/michael/execute')
ensemble_family.add_variable("SUBPROCESS_DIR", '/home/michael/subprocess_logs')
ensemble_family += [Task(f"ens_member_52").add_variable("JOB_INDEX", 0)]
ensemble_family += [
Task(f"ens_member_{j}")
Expand All @@ -38,7 +41,14 @@ def create_ensemble_family():
prep_task.add_variable("RUNOFF_LOCATION", "/home/michael/host_share/ecmwf")

suite += create_ensemble_family()


plain_table_task = suite.add_task('plain_table_task')
plain_table_task.add_trigger("ensemble_family == complete")
plain_table_task.add_variable("PYSCRIPT", os.path.join(home, 'spt_extract_plain_table.py'))
plain_table_task.add_variable("OUT_LOCATION", "/home/michael/host_share/rapid-io_init/output")
plain_table_task.add_variable("LOG_FILE", os.path.join(home, 'run_rapid/ecf_out/plain_table.log'))
plain_table_task.add_variable("NCES_EXEC", "/home/michael/miniconda3/envs/ecflow/bin/nces")

print(defs)

print("check trigger expressions")
Expand Down
2 changes: 1 addition & 1 deletion ecflow/run_rapid/ens_member.ecf
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
%include <head.h>
/usr/bin/python3.6 %PYSCRIPT% %ECF_FILES% %JOB_INDEX%
/usr/bin/python3.6 %PYSCRIPT% %ECF_FILES% %JOB_INDEX% %RAPID_EXEC% %EXEC_DIR% %SUBPROCESS_DIR%
%include <tail.h>
3 changes: 3 additions & 0 deletions ecflow/run_rapid/plain_table_task.ecf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
%include <head.h>
/usr/bin/python3.6 %PYSCRIPT% %OUT_LOCATION% %LOG_FILE% %NCES_EXEC%
%include <tail.h>
248 changes: 248 additions & 0 deletions ecflow/spt_extract_plain_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
#################################################################
#
# File: spt_extract_plain_table.py
# Author(s): Michael Souffront, Wade Roberts, Spencer McDonald
# Date: 03/07/2018
# Purpose: Calculate basic statistics for GloFAS-RAPID files and
# extract them to a summary table; interpolate forecast
# values for time steps other than 3 hrs
# Requirements: NCO, netCDF4, pandas
#
#################################################################

import os
import sys
import multiprocessing as mp
import subprocess as sp
import netCDF4 as nc
import datetime as dt
import numpy as np
import pandas as pd
import logging


def extract_summary_table(workspace):
# calls NCO's nces function to calculate ensemble statistics for the max, mean, and min
nces_exec = str(sys.argv[3])
for stat in ['max', 'avg', 'min']:
findstr = 'find {0} -name "Qout*.nc"'.format(workspace)
filename = os.path.join(workspace, 'nces.{0}.nc'.format(stat))
ncesstr = "{0} -O --op_typ={1} {2}".format(nces_exec, stat, filename)
args = ' | '.join([findstr, ncesstr])
sp.call(args, shell=True)

# creates list with the stat netcdf files created in the previous step
nclist = []
for file in os.listdir(workspace):
if file.startswith("nces"):
nclist.append(os.path.join(workspace, file))

# creates file name for the csv file
date_string = os.path.split(workspace)[1].replace('.', '')
full_name = os.path.split(os.path.split(workspace)[0])[1]
file_name = 'summary_table_{0}_{1}.csv'.format(full_name, date_string)

# creating pandas dataframe with return periods
d = {}
return_periods_path = os.path.join(os.path.split(workspace)[0], '{0}-return_periods.csv'.format(full_name))
with open(return_periods_path, 'r') as f:
lines = f.readlines()
lines.pop(0)
for line in lines:
d[line.split(',')[0]] = line.split(',')[1:4]

# creates a csv file to store statistics
try:
with open(os.path.join(workspace, file_name), 'w') as f:
# writes header
# f.write('comid,timestamp,max,min,style,flow_class\n')

# extracts forecast COMIDS and formatted dates into lists
comids = nc.Dataset(nclist[0], 'r').variables['rivid'][:].tolist()
rawdates = nc.Dataset(nclist[0], 'r').variables['time'][:].tolist()
dates = []
for date in rawdates:
dates.append(dt.datetime.utcfromtimestamp(date).strftime("%m/%d/%y %H:%M"))

# creates empty lists with forecast stats
maxlist = []
meanlist = []
minlist = []

# loops through the stat netcdf files to populate lists created above
for ncfile in sorted(nclist):
res = nc.Dataset(ncfile, 'r')

# loops through COMIDs with netcdf files
for index, comid in enumerate(comids):
if 'max' in ncfile:
maxlist.append(res.variables['Qout'][index, 0:49].tolist())
elif 'avg' in ncfile:
meanlist.append(res.variables['Qout'][index, 0:49].tolist())
elif 'min' in ncfile:
minlist.append(res.variables['Qout'][index, 0:49].tolist())

# creates step order list
step_order = range(1, 50)
# step_order = range(1, 200)

# creates watershed and subbasin names
watershed_name = full_name.split('-')[0]
subbasin_name = full_name.split('-')[1]

# creates unique id
count = 1

# loops through COMIDs again to add rows to csv file
for index, comid in enumerate(comids):
for step, date, max, mean, min in zip(step_order, dates, maxlist[index], meanlist[index],
minlist[index]):
# define style
if mean > float(d[str(comid)][2]):
style = 'purple'
elif mean > float(d[str(comid)][1]):
style = 'red'
elif mean > float(d[str(comid)][0]):
style = 'yellow'
else:
style = 'blue'

# define flow_class
if mean < 20:
flow_class = '1'
elif 20 <= mean < 250:
flow_class = '2'
elif 250 <= mean < 1500:
flow_class = '3'
elif 1500 <= mean < 10000:
flow_class = '4'
elif 10000 <= mean < 30000:
flow_class = '5'
else:
flow_class = '6'

f.write(','.join([str(comid), date, str(max), str(mean), style, flow_class + '\n']))
count += 1

return 'Stat Success'
except Exception as e:
logging.debug(e)


# function to take a given csv and interpolate all time series in it
def interpolate_table(path):
# importing the table
print('working on interpolation')
df = pd.read_csv(path, index_col=8)
interpolated_df = pd.DataFrame([])
if len(df.index) % 85 == 0:
n = 85
for i in range(int(len(df.index) / 85)):
# making a temporay df to interpolate in
df_temp = df.iloc[n - 85: n]

# resetting the index to datetime type
df_temp.index = pd.to_datetime(df_temp.index, infer_datetime_format=True)

# making a temporary dataframe for the 6 hour gap time series
df_temp_6_hr = df_temp.iloc[48:, :]

# making a new index with 3 hour time intervals rather than 6 hour
new_index = pd.date_range(df_temp_6_hr.index[0], df_temp_6_hr.index[len(df_temp_6_hr.index) - 1], freq='3H')

# reindexing the 6 hour df to a 3 hr df
df_temp_3_hr = df_temp_6_hr.reindex(new_index)

# filling the constant values with a forward fill
for col in ["watershed", "subbasin", "comid", "return2", "return10", "return20"]:
df_temp_3_hr[col].ffill(inplace=True)

# making a new index column
df_temp_3_hr['index'] = np.linspace(49, 121, len(df_temp_3_hr.index))

# using a pchip spline to interpolate the values in the new time interval
for col in ['max', 'mean', 'min']:
df_temp_3_hr[col] = df_temp_3_hr[col].interpolate('pchip')

# creating a variable to combine the new interpolated values to the dataframe
frames = [df_temp.iloc[:48], df_temp_3_hr]
# concatenating the variable
df_temp = pd.concat(frames)

# rearranging the dataframe to match how it was before
df_temp['timestamp'] = df_temp.index
df_temp.index = df_temp['id']
df_temp = df_temp.drop(['id'], axis=1)
cols = ['watershed', 'subbasin', 'comid', 'return2', 'return10', 'return20', 'index', 'timestamp', 'max',
'mean', 'min', 'style', 'flow_class']
df_temp = df_temp[cols]

# appending this section of the table back to the entire table
interpolated_df = interpolated_df.append(df_temp)

n += 85

# resetting the id column
interpolated_df.index = np.linspace(1, len(interpolated_df.index), len(interpolated_df.index), dtype=np.int16)

# changing the data types to match what was originally in the table
interpolated_df.index = interpolated_df.index.astype(np.int16)
interpolated_df['timestamp'] = interpolated_df['timestamp'].dt.strftime("%m/%d/%y %H:%M")
interpolated_df['index'] = interpolated_df['index'].astype(np.int16)
interpolated_df['comid'] = interpolated_df['comid'].astype(np.int64)

# logical indexing the styles column to fill the interpolated values with corresponding colors
interpolated_df.ix[(interpolated_df['mean'] > interpolated_df['return2']), ['style']] = 'yellow'
interpolated_df.ix[(interpolated_df['mean'] > interpolated_df['return10']), ['style']] = 'red'
interpolated_df.ix[(interpolated_df['mean'] > interpolated_df['return20']), ['style']] = 'purple'
interpolated_df.ix[(interpolated_df['mean'] <= interpolated_df['return2']), ['style']] = 'blue'

# logical indexing the flow class column to fill the interpolated values with corresponding values
interpolated_df.ix[(interpolated_df['mean'] < 20), ['flow_class']] = '1'
interpolated_df.ix[(interpolated_df['mean'] >= 20) & (interpolated_df['mean'] < 250), ['flow_class']] = '2'
interpolated_df.ix[(interpolated_df['mean'] >= 250) & (interpolated_df['mean'] < 1500), ['flow_class']] = '3'
interpolated_df.ix[(interpolated_df['mean'] >= 1500) & (interpolated_df['mean'] < 10000), ['flow_class']] = '4'
interpolated_df.ix[(interpolated_df['mean'] >= 10000) & (interpolated_df['mean'] < 30000), ['flow_class']] = '5'
interpolated_df.ix[(interpolated_df['mean'] > 30000), ['flow_class']] = '6'

# overwrite csv table with interpolated values, leaving header out
interpolated_df.to_csv(path, index_label='id', header=False)
return ('Interpolation Success')


# runs function on file execution
if __name__ == "__main__":
# output directory
workdir = str(sys.argv[1])

# list of watersheds
watersheds = [os.path.join(workdir, d) for d in os.listdir(workdir) if os.path.isdir(os.path.join(workdir, d))]

dates = []
exclude_list = []
for i in range(len(watersheds)):
for d in os.listdir(watersheds[i]):
if not any(excluded in watersheds[i] for excluded in exclude_list) and os.path.isdir(
os.path.join(watersheds[i], d)):
dates.append(os.path.join(watersheds[i], d))

logging.basicConfig(filename=str(sys.argv[2]), level=logging.DEBUG)

pool = mp.Pool()
results = pool.map(extract_summary_table, dates)

pool.close()
pool.join()
logging.debug('Finished')

# # populate interpolation list
# date_list = os.listdir(date)
# for file in date_list:
# if file.startswith("summary_table"):
# interpolation_list.append(os.path.join(date, file))
#
# # run interpolation
# for csv_path in interpolation_list:
# interpolate_table(
# path=csv_path
# )
Loading

0 comments on commit 04705cd

Please sign in to comment.