-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathspt_extract_plain_table.py
203 lines (175 loc) · 7.86 KB
/
spt_extract_plain_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#################################################################
# File: spt_extract_plain_table.py
# Author(s):
# Riley Hales
# Josh Ogden
# Michael Souffront
# Wade Roberts
# Spencer McDonald
# Date: 03/07/2018
# Last Updated: 05/28/2020
# Purpose: Calculate basic statistics for GloFAS-RAPID files and
# extract them to a summary table; interpolate forecast
# values for time steps other than 3 hrs
# Requirements: NCO, netCDF4, pandas, numpy, scipy
#################################################################
import datetime
import logging
import multiprocessing as mp
import os
import subprocess as sp
import sys
import glob
import netCDF4 as nc
import pandas as pd
import numpy as np
from scipy.interpolate import pchip
def extract_summary_table(workspace):
# calls NCO's nces function to calculate ensemble statistics for the max, mean, and min
nces_exec = str(sys.argv[3])
for stat in ['max', 'avg', 'min']:
findstr = 'find {0} -name "Qout*.nc"'.format(workspace)
filename = os.path.join(workspace, 'nces.{0}.nc'.format(stat))
ncesstr = "{0} -O --op_typ={1} {2}".format(nces_exec, stat, filename)
args = ' | '.join([findstr, ncesstr])
sp.call(args, shell=True)
# creates file name for the csv file
date_string = os.path.split(workspace)[1].replace('.', '')
region_name = os.path.basename(os.path.split(workspace)[0])
file_name = f'summary_table_{region_name}_{date_string}.csv'
static_path = str(sys.argv[5])
# creating pandas dataframe with return periods
era_type = str(sys.argv[4])
rp_path = glob.glob(os.path.join(static_path, era_type, region_name, f'*return_periods*.nc*'))[0]
logging.info(f'Return Period Path {rp_path}')
rp_ncfile = nc.Dataset(rp_path, 'r')
rp_df = pd.DataFrame({
'return_2': rp_ncfile.variables['return_period_2'][:],
'return_5': rp_ncfile.variables['return_period_5'][:],
'return_10': rp_ncfile.variables['return_period_10'][:],
'return_25': rp_ncfile.variables['return_period_25'][:],
'return_50': rp_ncfile.variables['return_period_50'][:],
'return_100': rp_ncfile.variables['return_period_100'][:]
}, index=rp_ncfile.variables['rivid'][:])
rp_ncfile.close()
# create the summary tables
try:
# create the summary table dataframe
columns = ['comid', 'timestamp', 'max', 'mean', 'color', 'thickness', 'ret_per']
summary_table_df = pd.DataFrame(columns=columns)
# read the date and COMID lists from one of the netcdfs
sample_nc = nc.Dataset(os.path.join(workspace, 'nces.avg.nc'), 'r')
comids = sample_nc['rivid'][:].tolist()
dates = sample_nc['time'][:].tolist()
dates = [datetime.datetime.utcfromtimestamp(date).strftime("%m/%d/%y %H:%M") for date in dates]
sample_nc.close()
# create a time series at 3 hour intervals for interpolation
interp_x = pd.date_range(
start=dates[0],
end=dates[-1],
freq='3H'
)
interp_x_strings = interp_x.strftime("%m/%d/%y %H:%M")
new_dates = list(interp_x_strings)
dt_dates = pd.to_datetime(dates)
# read the max and avg flows
max_flow_nc = nc.Dataset(os.path.join(workspace, 'nces.max.nc'))
max_flow_array = max_flow_nc.variables['Qout'][:].round(2).tolist()
max_flow_nc.close()
mean_flow_nc = nc.Dataset(os.path.join(workspace, 'nces.avg.nc'))
mean_flow_array = mean_flow_nc.variables['Qout'][:].round(2).tolist()
mean_flow_nc.close()
# for each comid, interpolate to 3-hourly with pchip, compare to return periods to generate map styling info
for index, (comid, maxes, means) in enumerate(zip(comids, max_flow_array, mean_flow_array)):
# interpolate maxes and means
max_interpolator = pchip(dt_dates, maxes)
mean_interpolator = pchip(dt_dates, means)
max_flows = max_interpolator(interp_x)
mean_flows = mean_interpolator(interp_x)
# loop for creating color, thickness, and return period columns
colors = []
thicknesses = []
ret_pers = []
for mean_flow in mean_flows:
# define reach color based on return periods
if mean_flow > rp_df.loc[comid, 'return_50']:
color = 'purple'
elif mean_flow > rp_df.loc[comid, 'return_10']:
color = 'red'
elif mean_flow > rp_df.loc[comid, 'return_2']:
color = 'yellow'
else:
color = 'blue'
colors.append(color)
# define reach thickness based on flow magnitude
if mean_flow < 20:
thickness = '1'
elif 20 <= mean_flow < 250:
thickness = '2'
elif 250 <= mean_flow < 1500:
thickness = '3'
elif 1500 <= mean_flow < 10000:
thickness = '4'
elif 10000 <= mean_flow < 30000:
thickness = '5'
else:
thickness = '6'
thicknesses.append(thickness)
# define return period exceeded by the mean forecast
if mean_flow > rp_df.loc[comid, 'return_100']:
ret_per = '100'
elif mean_flow > rp_df.loc[comid, 'return_50']:
ret_per = '50'
elif mean_flow > rp_df.loc[comid, 'return_25']:
ret_per = '25'
elif mean_flow > rp_df.loc[comid, 'return_10']:
ret_per = '10'
elif mean_flow > rp_df.loc[comid, 'return_5']:
ret_per = '5'
elif mean_flow > rp_df.loc[comid, 'return_2']:
ret_per = '2'
else:
ret_per = '0'
ret_pers.append(ret_per)
summary_table_df = pd.concat([summary_table_df, pd.DataFrame({
columns[0]: pd.Series([comid, ] * len(new_dates)),
columns[1]: pd.Series(new_dates),
columns[2]: pd.Series(max_flows).round(2),
columns[3]: pd.Series(mean_flows).round(2),
columns[4]: pd.Series(colors),
columns[5]: pd.Series(thicknesses),
columns[6]: pd.Series(ret_pers)
})], ignore_index=True)
# write to csv
summary_table_df.to_csv(os.path.join(workspace, file_name), index=False)
except Exception as e:
logging.debug(e)
# runs function on file execution
if __name__ == "__main__":
"""
Arguments:
1. Absolute path to the rapid-io/output directory
This contains a directory for each region
Each region directory contains a directory named for the forecast date
2. Path to the logging file
3. NCO command to compute ensemble stats
4. era_type: == 'era_5'
5. static_path: absolute path to the root directory with all
return period files. The next level after this directory
should be the different era types.
"""
logging.basicConfig(filename=str(sys.argv[2]), level=logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
# output directory
workdir = str(sys.argv[1])
# list of watersheds
region_folders = [os.path.join(workdir, d) for d in os.listdir(workdir) if os.path.isdir(os.path.join(workdir, d))]
# list of all forecast date folders in all the region folders
date_folders = np.array([glob.glob(os.path.join(region_folder, '*')) for region_folder in region_folders]).flatten()
date_folders = [d for d in date_folders if os.path.isdir(d)]
# map function to list of date folders
pool = mp.Pool()
results = pool.map(extract_summary_table, date_folders)
pool.close()
pool.join()
logging.debug('Finished')