forked from NOAA-EMC/global-workflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgsi_utils.py
137 lines (115 loc) · 4.26 KB
/
gsi_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# gsi_utils.py
# a collection of functions, classes, etc.
# used for the GSI global analysis
def isTrue(str_in):
""" isTrue(str_in)
- function to translate shell variables to python logical variables
input: str_in - string (should be like 'YES', 'TRUE', etc.)
returns: status (logical True or False)
"""
str_in = str_in.upper()
if str_in in ['YES', '.TRUE.']:
status = True
else:
status = False
return status
def link_file(from_file, to_file):
""" link_file(from_file, to_file)
- function to check if a path exists, and if not, make a symlink
input: from_file - string path
to_file - string path
"""
import os
if not os.path.exists(to_file):
if not os.path.islink(to_file):
os.symlink(from_file, to_file)
else:
print(to_file + " exists, unlinking.")
os.unlink(to_file)
os.symlink(from_file, to_file)
print("ln -s " + from_file + " " + to_file)
def copy_file(from_file, to_file):
import shutil
shutil.copy(from_file, to_file)
print("cp " + from_file + " " + to_file)
def make_dir(directory):
import os
os.makedirs(directory)
print("mkdir -p " + directory)
def write_nml(nml_dict, nml_file):
""" write_nml(nml_dict, nml_file)
- function to write out namelist dictionary nml_dict to file nml_file
input: nml_dict - dictionary of dictionaries
first dictionary is &nml, second is nmlvar='value'
NOTE: this shoudl be an OrderedDict or else it might fail
nml_file - string path to write namelist file to
"""
nfile = open(nml_file, 'w')
for nml, nmlvars in nml_dict.items():
nfile.write('&' + nml + '\n')
for var, val in nmlvars.items():
nfile.write(' ' + str(var) + ' = ' + str(val) + '\n')
nfile.write('/\n\n')
nfile.close()
def get_ncdims(ncfile):
""" get_ncdims(ncfile)
- function to return dictionary of netCDF file dimensions and their lengths
input: ncfile - string to path to netCDF file
output: ncdims - dictionary where key is the name of a dimension and the
value is the length of that dimension
ex: ncdims['pfull'] = 127
"""
try:
import netCDF4 as nc
except ImportError as err:
raise ImportError(f"Unable to import netCDF4 module")
ncf = nc.Dataset(ncfile)
ncdims = {}
for d in ncf.dimensions.keys():
ncdims[d] = int(len(ncf.dimensions[d]))
ncf.close()
return ncdims
def get_nemsdims(nemsfile, nemsexe):
""" get_nemsdims(nemsfile,nemsexe)
- function to return dictionary of NEMSIO file dimensions for use
input: nemsfile - string to path nemsio file
nemsexe - string to path nemsio_get executable
output: nemsdims - dictionary where key is the name of a dimension and the
value is the length of that dimension
ex: nemsdims['pfull'] = 127
"""
import subprocess
ncdims = {
'dimx': 'grid_xt',
'dimy': 'grid_yt',
'dimz': 'pfull',
}
nemsdims = {}
for dim in ['dimx', 'dimy', 'dimz']:
out = subprocess.Popen([nemsexe, nemsfile, dim], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = out.communicate()
nemsdims[ncdims[dim]] = int(stdout.split(' ')[-1].rstrip())
return nemsdims
def get_timeinfo(ncfile):
""" get_timeinfo(ncfile)
- function to return datetime objects of initialized time and valid time
input: ncfile - string to path to netCDF file
returns: inittime, validtime - datetime objects
nfhour - integer forecast hour
"""
try:
import netCDF4 as nc
except ImportError as err:
raise ImportError(f"Unable to import netCDF4 module\n{err}")
import datetime as dt
import re
ncf = nc.Dataset(ncfile)
time_units = ncf['time'].units
date_str = time_units.split('since ')[1]
date_str = re.sub("[^0-9]", "", date_str)
initstr = date_str[0:10]
inittime = dt.datetime.strptime(initstr, "%Y%m%d%H")
nfhour = int(ncf['time'][0])
validtime = inittime + dt.timedelta(hours=nfhour)
ncf.close()
return inittime, validtime, nfhour