From 892741ae20d87837575036dab2457c1529b10f4e Mon Sep 17 00:00:00 2001 From: Brendan Blanchard Date: Tue, 5 Mar 2019 11:21:50 -0700 Subject: [PATCH] Initial commit --- Pysplit Processor/pysplit_db_setup.py | 114 +++++++++++++ Pysplit Processor/pysplit_processor.py | 211 +++++++++++++++++++++++++ 2 files changed, 325 insertions(+) create mode 100644 Pysplit Processor/pysplit_db_setup.py create mode 100644 Pysplit Processor/pysplit_processor.py diff --git a/Pysplit Processor/pysplit_db_setup.py b/Pysplit Processor/pysplit_db_setup.py new file mode 100644 index 0000000..69dba49 --- /dev/null +++ b/Pysplit Processor/pysplit_db_setup.py @@ -0,0 +1,114 @@ +""" +Created on Tues Dec 5 08:00:00 2018 + +@author: BB + +This creates the SQLite DB that will be used to track trajectories that have +been created, and ensure the met files needed to create them are stored locally. +It is intended to be run once, prior to running the processor. + +By default, it adds all possible 1-H increments from 1/1/2017 +through 1/1/2018. See docs for pd.date_range() in pandas to create +other timesteps. + +It is currently configured to parse only hrrra files from the database, but a new +function could be substituted for get_hrrra_file_list() if desired. +""" + +import os +import sqlite3 +from datetime import datetime +from pathlib import Path +import pandas as pd +from ftplib import FTP + +homedir = Path(os.getcwd()) +local_hrrr_dir = homedir / 'met/hrrr' +remote_hrrr_dir = r'/archives/hrrr/' + +site_dates = pd.date_range(start='1/1/2017', end='1/1/2018', freq= '1H', tz='MST').tz_convert('UTC').tolist() +# matrix of all dates in UTC (for model input) that require trajectories + +os.chdir(homedir) + +db_con = sqlite3.connect('pysplit_runs.sqlite') # connect to local DB +db_cur = db_con.cursor() + +proceed = input('Do you wish to reset/create the database? This will overwrite any previous work. (y/n) ') + +if proceed is 'y': + + db_cur.executescript(''' + DROP TABLE IF EXISTS files; + DROP TABLE IF EXISTS trajectories; + + CREATE TABLE files ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, + fn TEXT UNIQUE, + traj_date TEXT, + local BOOLEAN, + remote BOOLEAN, + needed_for_month BOOLEAN + ); + + CREATE TABLE trajectories ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, + traj_date TEXT UNIQUE, + fn TEXT UNIQUE, + processed BOOLEAN, + attempted BOOLEAN + ); + + ''') + + # 'files' is the table for all met files available remotely + # 'remote' should be all 1 + # 'local' are those available locally + # 'needed_for_month' is the subset needed to process the month in question + + for date in site_dates: + date = datetime.strftime(date, '%Y-%m-%d %H:%M:%S') # convert to string for DB + db_cur.execute('''INSERT OR IGNORE INTO trajectories (traj_date, processed, attempted) + VALUES ( ?, ?, ? )''', ( date, 0, 0) ) # place date and label as un-processed + # file name will be added when it's processed and saved + + ftp_con = FTP('arlftp.arlhq.noaa.gov') # connect to NOAA's FTP server + ftp_con.login() # login as anonymous and move to correct dir + ftp_con.cwd(remote_hrrr_dir) + + def get_hrrra_file_list(conn): + ''' + This function takes one FTP connection (to the ARL Server) and returns + a list of all the hrrr met files in that connection's cwd + ''' + remote_out = [] + conn.dir(remote_out.append) + remote_files = [] + for line in remote_out: + if 'hrrra' in line: + remote_files.append(line.split(' ')[-1]) + return remote_files + + def parse_files_for_dates(met_file_list): + file_dates = [] + for file in met_file_list: + met_date = ''.join(file.split('.')[1:3]) + file_dates.append(datetime.strptime(met_date,'%Y%m%d%Hz')) + + return file_dates + + remote_met_files = get_hrrra_file_list(ftp_con) # list of all available met files on the server + met_file_dates = parse_files_for_dates(remote_met_files) # list of all available dates from met files on the server + + for filename, date in zip(remote_met_files, met_file_dates): + date = datetime.strftime(date, '%Y-%m-%d %H:%M:%S') # convert to string for DB + db_cur.execute('''INSERT OR IGNORE INTO files (fn, traj_date, local, remote) + VALUES ( ?, ?, ?, ? )''', ( filename, date, 0, 1)) + # insert all remote files and their dates + # mark as available remote, not-available local + + db_con.commit() # save everything and finish + + # Every 1H trajectory run up until this hour is now in pysplit_runs.sqlite, + # and can now be recalled and checked off as the processor goes. A list of + # available met files is also ready for retrieval based on the data needed. diff --git a/Pysplit Processor/pysplit_processor.py b/Pysplit Processor/pysplit_processor.py new file mode 100644 index 0000000..c5456ac --- /dev/null +++ b/Pysplit Processor/pysplit_processor.py @@ -0,0 +1,211 @@ +import os +import sqlite3 +import datetime as dt +from datetime import datetime +import pandas as pd +from ftplib import FTP +from pathlib import Path +import pysplit +import pytz +""" +This should be run after pysplit_db_setup.py, which configures the database this +will work from. +""" + +trajectory_runtime = -12 + +homedir = Path(os.getcwd()) +local_hrrr_dir = homedir / 'met/hrrr' +remote_hrrr_dir = r'/archives/hrrr/' +hysplit_dir = r'C:/hysplit4/working' +out_dir = homedir / 'trajectories' + +coords = (40.07,-105.22) # coordinates to be run + +altitude = 5 # HYSPLIT defaults to AGL, this is a guess... + +def get_ftp_file(fn_local, fn_remote): + ''' + Use internal FTP connection and save fn_remote from its CWD to the + local file fn_local, which is usually the same. + ''' + ftp_con = FTP('arlftp.arlhq.noaa.gov') # connect to NOAA's FTP server + ftp_con.login() # login as anonymous and move to correct dir + ftp_con.cwd(remote_hrrr_dir) + + with open(fn_local,'wb') as localfile: + ftp_con.retrbinary('RETR ' + fn_remote, localfile.write, 1024) + print(f'{fn_remote} retreived and saved as {fn_local}') + ftp_con.quit() + +os.chdir(homedir) + +db_con = sqlite3.connect('pysplit_runs.sqlite') # connect to local DB +db_cur = db_con.cursor() + +db_cur.execute('SELECT * FROM trajectories WHERE processed = 0 AND attempted = 0') # find all trajectories that haven't been run + +def parse_db_trajectories_select(fetched_all): + """Pulls all information returned by SQL queries for un-processed trajectories""" + ids = [line[0] for line in fetched_all] + dates = [pytz.utc.localize(datetime.strptime(line[1], '%Y-%m-%d %H:%M:%S')) for line in fetched_all] + processed_status = [line[3] for line in fetched_all] + + return (ids, dates, processed_status) + +[traj_ids,traj_dates,processed_status] = parse_db_trajectories_select(db_cur.fetchall()) # get list of all un-processed trajectory dates + +traj_date_groups = pd.period_range(start = traj_dates[0], end = traj_dates[-1], freq = '1W') +# make a list of all week-long groups in the unprocessed data + +for period in traj_date_groups: + print(f'Processing trajectories for the period {period}.') + + met_dates_to_process = [] + + for date in traj_dates: + start = pytz.utc.localize(period.start_time - dt.timedelta(hours=8)) + end = pytz.utc.localize(period.end_time + dt.timedelta(hours=8)) + # add more than 6 hours on each end to buffer met files + + if start <= date < end: + met_dates_to_process.append(date) + + if len(met_dates_to_process) == 0: + print(f'No met files on server found for processing the period {period} so it was skipped.') + continue + + met_start_date = met_dates_to_process[0] - dt.timedelta(hours = abs(trajectory_runtime*2)) # buffer met files by 2x the runtime for safety + met_end_date = met_dates_to_process[-1] + dt.timedelta(hours = abs(trajectory_runtime*2)) + + db_cur.execute('SELECT * FROM files WHERE remote = 1') + + def parse_db_files_select(fetched_all): + """Process the SQL query return for files.""" + ids = [line[0] for line in fetched_all] + filenames = [line[1] for line in fetched_all] + dates = [pytz.utc.localize(datetime.strptime(line[2], '%Y-%m-%d %H:%M:%S')) for line in fetched_all] + local_status = [line[3] for line in fetched_all] + remote_status = [line[4] for line in fetched_all] + needed_for_month = [line[5] for line in fetched_all] + + return (ids, filenames, dates, local_status, remote_status, needed_for_month) + + [met_ids, filenames, met_dates, local_status, remote_status, needed_for_month] = ( + parse_db_files_select(db_cur.fetchall())) # get all file info from db + + os.chdir(local_hrrr_dir) + + fns_to_download = [] + + for filename, date in zip(filenames, met_dates): # create download list of met files needed for this period + if met_start_date <= date <= met_end_date: + fns_to_download.append(filename) + # append all file names + # print(fns_to_download) + + # local directory cleanup + with os.scandir(local_hrrr_dir) as local_files: + for file in local_files: + if file.name in fns_to_download: + fns_to_download.remove(file.name) + # do nothing locally if file needed; remove from download list + else: + # otherwise, delete local file; it's no longer needed + os.remove(file.name) + + for filename in fns_to_download: + get_ftp_file(filename, filename) # get and save as same name + # status prints are embedded in get_ftp_file() + + db_cur.execute('SELECT id FROM files WHERE fn = ?',(filename,)) + ind = db_cur.fetchone()[0] # find filename in local DB, update below if found + + if ind is None: + print(f'File {filename} not processed into database correctly.') + continue + else: + db_cur.execute('''UPDATE OR IGNORE files SET (local) = (?) + WHERE id = ?''', (1, ind)) + db_con.commit() + # update db to reflect the file is now available locally + + print(f'All files for period {period} retrieved.') + + # all met files are now in local_hrrr_dir + # met_dates_to_process is now a list of trajectory-hours to process (in UTC!!!) + + def get_hrrra_met_files(traj_date, runtime, met_dir): + """ + Create a list of met files for a trajectory at traj_date with a running time + of runtime. + """ + + if runtime < 0: # if a back-trajectory, set end of period to given date + traj_end = traj_date + dt.timedelta(hours = 7) + traj_start = traj_date + dt.timedelta(hours = runtime-7) + else: + traj_start = traj_date - dt.timedelta(hours = 7) + traj_end = traj_date + dt.timedelta(hours = runtime+7) + + met_files = [] + + os.chdir(met_dir) + + with os.scandir() as files: # check specifically for hrrr met files + for file in files: + if len(file.name.split('.')) == 4: + datestring = (file.name.split('.')[1] + file.name.split('.')[2]).replace('z','') + ts = pd.to_datetime(datestring, format='%Y%m%d%H').tz_localize('UTC') + if (ts >= traj_start) & (ts <= traj_end): + met_files.append(file.name) + + else: + continue + + return (met_files, traj_start, traj_end) + + for date in met_dates_to_process: # generate trajectories for each date in met_dates_to_process + [met_files, start, end] = get_hrrra_met_files(date, trajectory_runtime, local_hrrr_dir) + + print(f'Trajectory for {date} being processed.') + print(f'Met files for {start} to {end}: {met_files}') + + [trajname, err] = pysplit.generate_singletraj(f'fc_csu_12hr_',hysplit_dir,out_dir, + local_hrrr_dir,met_files,date.year, + date.month,date.day, date.hour,altitude, + coords,trajectory_runtime) + # generate_singletraj now returns the generated filename AND if the run failed err = 1 == fail + + if err == 0: + # If no error, print and then update DB to reflect it being done + print(f'Trajectory for {date} generated.') + + db_cur.execute('SELECT id FROM trajectories WHERE traj_date = ?', + (datetime.strftime(date,'%Y-%m-%d %H:%M:%S'),)) + ind = db_cur.fetchone()[0] + + if ind is None: + print(f'Trajectory for date {date} not found in database.') + continue + else: + db_cur.execute('''UPDATE OR IGNORE trajectories SET (processed, fn, attempted) = (?, ?, ?) + WHERE id = ?''', (1, trajname, 0, ind)) + db_con.commit() #add traj name when processed, and commit + else: + # Otherwise, print then set as attempted but not processed in the DB. It can be retried later + print(f'Trajectory for {date} NOT generated due to file moving/Hysplit Error.') + + db_cur.execute('SELECT id FROM trajectories WHERE traj_date = ?', + (datetime.strftime(date,'%Y-%m-%d %H:%M:%S'),)) + ind = db_cur.fetchone()[0] + + if ind is None: + print(f'Trajectory for date {date} not processed into database correctly.') + continue + else: + db_cur.execute('''UPDATE OR IGNORE trajectories SET (processed, fn, attempted) = (?, ?, ?) + WHERE id = ?''', (0, None, 1, ind)) + db_con.commit() # set as unprocessed and w/o traj name, mark as attempted, commit + + print(f'All trajectories (except where error announced) for period {period} have been processed.')