Skip to content

Commit 002e347

Browse files
committed
Adding outliers in a new table
1 parent eb54f14 commit 002e347

File tree

9 files changed

+151
-48
lines changed

9 files changed

+151
-48
lines changed

DRAW-post-processing/config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import phase1_methods as methods
44

5-
65
# assigning post-process ID's to field ID's
76
# assign a TUPLE to multiple field_id's with one PPID; otherwise assign integer for single field_id
87
ppid_to_field_id = {1: (4, 6, 7, 8, 67, 69),

DRAW-post-processing/database_connection.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
cursor = conn.cursor()
2828

29-
url = "mysql+mysqlconnector://"+db_user+":"+db_passwd+"@l"+db_host+"/"+db_name
29+
url = "mysql+mysqlconnector://"+db_user+":"+db_passwd+"@"+db_host+"/"+db_name
3030
engine = sqlalchemy.create_engine(url)
3131

3232

@@ -55,4 +55,10 @@ def phase_2_data():
5555
sql_command = sql_commands.phase_2_data_sql
5656
cursor.execute(sql_command)
5757
result = cursor.fetchall()
58-
return result
58+
return result
59+
60+
def outliers_stats():
61+
sql_command=sql_commands.outliers_stats_sql
62+
cursor.execute(sql_command)
63+
result = cursor.fetchall()
64+
return result

DRAW-post-processing/execute_post_process.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import database_connection as db
2+
import datetime
23
import tables
34
import observation_reconciliation as reconcile
45
import remove_low_transcription_users as remove_ltu
@@ -32,16 +33,15 @@
3233

3334
import phase2_methods as id1p2_methods
3435
import time
35-
36+
import logs as l
3637
import sef_gen
3738

38-
#import argparse
3939

4040
def logPerf(message):
4141
global tic
4242
toc = time.perf_counter()
43-
print(message, end='')
44-
print (f": {toc - tic:0.4f} seconds")
43+
l.log(message, end='')
44+
l.log (f": {toc - tic:0.4f} seconds")
4545
tic=toc
4646

4747
# point data entry to particular post_processing algorithm for phase 1 depending on its post_process_id
@@ -95,7 +95,12 @@ def filter_id(pp_id, entry, phase):
9595
else:
9696
pass
9797

98+
99+
# REdirecting stdout to a stream so that it can be saved in a DB
100+
98101
tic = time.perf_counter()
102+
start_time=datetime.datetime.now()
103+
99104

100105
# Experimental: implement a continue flag
101106
continue_flag=False
@@ -129,17 +134,17 @@ def filter_id(pp_id, entry, phase):
129134

130135
logPerf("Created indexed for raw data processing")
131136

132-
print ("Phase 1: ")
137+
l.log ("Phase 1: ")
133138
counter = 0
134139
for row in raw_entries:
135140
post_process_id = row[8]
136141
filter_id(post_process_id, row, 1)
137142
counter += 1
138143
if (counter % 1000) == 0:
139-
print('.', end="")
144+
l.log('.', end="")
140145
if (counter % 50000) == 0:
141-
print("")
142-
print("")
146+
l.log("")
147+
l.log("")
143148
logPerf("Phase 1 complete")
144149

145150
# Save corrected data in database
@@ -173,18 +178,19 @@ def filter_id(pp_id, entry, phase):
173178

174179
pressure_lead_digs_added = id1p2_methods.pressure_artificial_lead_digs_list()
175180
counter = 0
176-
print ("Phase 2:")
181+
l.log ("Phase 2:")
177182
for row in entries:
178183
post_process_id = row[8]
179184
filter_id(post_process_id, row, 2)
180185
counter += 1
181186
if (counter % 1000) == 0:
182-
print('.', end="")
187+
l.log('.', end="")
183188
if (counter % 50000) == 0:
184-
print("")
189+
l.log("")
185190

186191
logPerf("Completed post-process 1 phase 2")
187-
id3p2.phase_2(entries)
192+
tables.create_outliers_graphs()
193+
outlier_graphs=id3p2.phase_2(entries)
188194
logPerf("Completed post-process 3 phase 2")
189195

190196
tables.populate_final_corrected_table()
@@ -193,21 +199,30 @@ def filter_id(pp_id, entry, phase):
193199
logPerf("Phase 2 complete")
194200

195201
##################### EXECUTE PHASE 3 (ISO TRANSLATION) #########################
196-
print ("Phase 3:")
202+
l.log ("Phase 3:")
197203
tables.create_final_corrected_table_iso(continue_flag)
198204
entries=db.phase_2_data()
199205
for row in entries:
200206
post_process_id = row[8]
201207
filter_id(post_process_id, row, 3)
202208
counter += 1
203209
if (counter % 1000) == 0:
204-
print('.', end="")
210+
l.log('.', end="")
205211
if (counter % 50000) == 0:
206-
print("")
212+
l.log("")
207213
tables.populate_final_corrected_table_iso()
208214

209215
logPerf("Completed phase 3")
210216

217+
#################### Generating SEF files ##########################
218+
l.log("Generating SEF files")
219+
sef_gen.generateSEFs()
220+
logPerf("SEF files generated")
221+
222+
#################### Saving report #############################
223+
report_id= tables.writeReport(l.report,start_time)
224+
tables.insert_outlier_graphs(report_id, outlier_graphs)
225+
tables.insert_outlier_stats(report_id, db.outliers_stats())
211226

212227
##################### DELETE ALL DISPENSABLE TABLES (KEEP FINAL + ERRORS/EDITS TABLES) ############################
213228
tables.delete_table('data_entries_raw')
@@ -216,7 +231,3 @@ def filter_id(pp_id, entry, phase):
216231
logPerf("cleaned up database")
217232

218233

219-
#################### Generating SEF files ##########################
220-
print("Generating SEF files")
221-
sef_gen.generateSEFs()
222-
logPerf("SEF files generated")

DRAW-post-processing/observation_reconciliation.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import database_connection as db
44
import statistics as stats
55
import tables
6-
6+
import logs as p
77
cursor = db.cursor
88

99

@@ -13,7 +13,7 @@ def remove_duplicates():
1313

1414
counter = 0
1515
checked_entries = {}
16-
print("Reconciliation:")
16+
p.log("Reconciliation:")
1717
for entry in data_entries:
1818
if entry[9] is not None and entry[0] not in checked_entries.keys():
1919
cursor.execute("SELECT * FROM data_entries_corrected "
@@ -50,7 +50,7 @@ def remove_duplicates():
5050
tables.add_to_duplicateless_table(*entry)
5151
counter += 1
5252
if (counter % 1000) == 0:
53-
print('.', end="")
53+
p.log('.', end="")
5454
if (counter % 50000) == 0:
55-
print("")
55+
p.log("")
5656
tables.populate_duplicateless_table()

DRAW-post-processing/post_process_ids/id3/id_3_phase_2.py

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
import config
66
import math
77
import time
8-
#import numpy as np
9-
# curve-fit() function imported from scipy
10-
#from scipy.optimize import curve_fit
8+
import logs as p
119
from matplotlib import pyplot as plt
1210

1311

@@ -17,7 +15,7 @@ def log_errors(code,errors):
1715

1816
# compares observed and corrected values. If not within a threshold, they are both marked as error [304]
1917
def compare_observed_corrected (df,field_observed,field_corrected):
20-
print(" Comparing observed vs corrected for fields " + str(field_observed)+" vs "+str(field_corrected))
18+
p.log(" Comparing observed vs corrected for fields " + str(field_observed)+" vs "+str(field_corrected))
2119
df_comp=df[df['field_id'].isin([field_observed,field_corrected]) ]
2220
temp_observed_errors=df_comp.groupby(['observation_date'])['value'].diff().dropna().abs().gt(config.temperature_difference_allowed_obs_corr)
2321

@@ -30,7 +28,7 @@ def compare_observed_corrected (df,field_observed,field_corrected):
3028

3129
# verifies that min is less than max at a given time. If not, both entries are marked as errors [305]
3230
def compare_min_max (df,field_min,field_max):
33-
print(" Comparing min/max for fields "+str(field_min) +"/"+str(field_max))
31+
p.log(" Comparing min/max for fields "+str(field_min) +"/"+str(field_max))
3432
df_comp=df[df['field_id'].isin([field_min,field_max])].sort_values(by=['observation_date'])
3533
obs_date=None
3634
min_temp=math.nan
@@ -52,7 +50,7 @@ def compare_min_max (df,field_min,field_max):
5250

5351
# Verifies that the first field in the list is less than all other fields - same observation time. If not, marked as flagged [2]
5452
def compare_field_less_than_other_fields(df,fields):
55-
print (" Comparing that field "+str(fields[0])+" is less than these fields: "+str(fields[1]))
53+
p.log (" Comparing that field "+str(fields[0])+" is less than these fields: "+str(fields[1]))
5654
df_comp=df[df['field_id'].isin(fields)].sort_values(by=['observation_date'])
5755
obs_date=None
5856
min_temp=math.nan
@@ -84,7 +82,7 @@ def compare_field_less_than_other_fields(df,fields):
8482

8583
# check the the min field is the min of all previous fields between last min field measurement or 24 hours. If not marked as flagged [3]
8684
def check_field_is_min_over_period(df,min_field,max_field):
87-
print (" Checking that field "+str(min_field)+" is the minimum of all values of this field: "+str(max_field))
85+
p.log (" Checking that field "+str(min_field)+" is the minimum of all values of this field: "+str(max_field))
8886
df_comp=df[df['field_id'].isin([min_field,max_field])].sort_values(by=['observation_date'])
8987
obs_date=None
9088
min_temp=math.nan
@@ -106,7 +104,7 @@ def check_field_is_min_over_period(df,min_field,max_field):
106104

107105
# check that the max field is the max of all previous fields between last max field measurement or 24 hours. If not marked as flagged [4]
108106
def check_field_is_max_over_period(df,max_field,min_field):
109-
print (" Checking that field "+str(max_field)+" is the maximum of all values of this field: "+str(min_field))
107+
p.log (" Checking that field "+str(max_field)+" is the maximum of all values of this field: "+str(min_field))
110108
df_comp=df[df['field_id'].isin([max_field,min_field])].sort_values(by=['observation_date'])
111109
obs_date=None
112110
max_temp=math.nan
@@ -143,7 +141,7 @@ def compare_min_max_df (df,field_min,field_max):
143141

144142
# checks air temperature and wet bulb are less than a certain threshold
145143
def check_air_wet_bulb(df, fields):
146-
print (" Checking wet bulb for fields: "+str(fields))
144+
p.log (" Checking wet bulb for fields: "+str(fields))
147145
df_comp=df[df['field_id'].isin([fields])].sort_values(by=['observation_date'])
148146
obs_date=None
149147
f0=math.nan
@@ -178,10 +176,9 @@ def check_air_wet_bulb(df, fields):
178176

179177

180178

181-
# Detects outliers and flags them [1]
179+
# Detects outliers and flags them [1] and returns list of graph data
182180
def flag_outliers (df, field_id):
183-
184-
181+
outliers_data=[]
185182
df_proc=df[df.field_id==field_id].sort_values(by=['observation_date'])
186183

187184
#determine list of series that are eligible for validation based on rule: needs less than 5 days before or after with no data
@@ -203,9 +200,9 @@ def flag_outliers (df, field_id):
203200
standard_deviation=delta.std()
204201
outliers=df_proc[df_proc.index.isin(delta[delta.gt(config.temperature_outlier_std_factor*standard_deviation)].index)]
205202
if outliers.size >0:
203+
ans_max=ans+config.temperature_outlier_std_factor*standard_deviation
204+
ans_min=ans-config.temperature_outlier_std_factor*standard_deviation
206205
if config.temperature_plot_outliers == True:
207-
ans_max=ans+config.temperature_outlier_std_factor*standard_deviation
208-
ans_min=ans-config.temperature_outlier_std_factor*standard_deviation
209206
fig, ax = plt.subplots(1, figsize = (20, 8))
210207
fig.autofmt_xdate()
211208
ax.plot(x, y, '.', color ='black', label ="data")
@@ -218,10 +215,37 @@ def flag_outliers (df, field_id):
218215
#flag the outliers
219216
for ind,outlier in outliers.iterrows():
220217
df.at[ind,'flagged']=10
221-
218+
# Build graph json data
219+
data="{\"data\":["
220+
first_data=True
221+
for ind in x.keys():
222+
if first_data==False:
223+
data=data+","
224+
first_data=False
225+
data=data+"{\"x\":\""+str(x[ind])+"\","
226+
data=data+"\"y\":"+str(y[ind])+","
227+
data=data+"\"ly\":"
228+
if pd.isna(ans_min[ind]):
229+
data=data+"null"
230+
else:
231+
data=data+str(ans_min[ind])
232+
data=data+",\"uy\":"
233+
if pd.isna(ans_max[ind]):
234+
data=data+"null"
235+
else:
236+
data=data+str(ans_max[ind])
237+
data=data+",\"outlier\":"
238+
if ind in outliers:
239+
data=data+str(outliers[ind])
240+
else:
241+
data=data+"null"
242+
data=data+"}"
243+
data=data+"]}"
244+
outliers_data.append((field_id,data))
222245
obs_date=row['observation_date']
223246
list_partial=[]
224247
list_partial.append(row)
248+
return outliers_data
225249

226250

227251

@@ -231,11 +255,11 @@ def phase_2(entries,debug=False):
231255

232256
def logPerf(tic,message):
233257
toc = time.perf_counter()
234-
print(message, end='')
235-
print (f": {toc - tic:0.4f} seconds")
258+
p.log(message, end='')
259+
p.log (f": {toc - tic:0.4f} seconds")
236260
return toc
237261

238-
print ("Starting temperature phase 2")
262+
p.log ("Starting temperature phase 2")
239263
tic = time.perf_counter()
240264
# execute post process id3 on the whole dataset, not one entry at a time
241265
df=pd.DataFrame(entries,
@@ -271,7 +295,7 @@ def logPerf(tic,message):
271295
try:
272296
check_field_is_min_over_period(df_temp_nona, fields[0], fields[1])
273297
except:
274-
print(df_temp_nona, fields[0], fields[1])
298+
p.log(df_temp_nona, fields[0], fields[1])
275299
tic=logPerf(tic, "Completed field is minimum of other fields over period of time")
276300

277301
# check temperature is the max of other values within past 24 hours max
@@ -298,12 +322,14 @@ def logPerf(tic,message):
298322
tic=logPerf(tic, "Completed removing detected errors before outlier detection")
299323

300324
# get series of values for a given field ID
325+
outliers_graph=[]
301326
for field in config.temperature_stat_outliers:
302-
flag_outliers(df_temp_cleaned, field)
327+
outliers_graph.append(flag_outliers(df_temp_cleaned, field))
303328
tic=logPerf(tic, "Completed outlier detection")
304329

305330

306331
# fit the series
307332
df_temp_cleaned.to_sql('data_entries_corrected_final', db.engine, if_exists='append', index=False)
333+
return outliers_graph
308334

309335

DRAW-post-processing/sef_gen.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import datetime
44
import config
55
import os
6+
import logs as p
7+
68
cursor = db.cursor
79

810
def generateSEFs():
@@ -76,7 +78,7 @@ def getFilename(sef_type,type_result_set):
7678

7779

7880
def generateSEF(sef_type):
79-
print("Generating SEF for type: " + sef_type)
81+
p.log("Generating SEF for type: " + sef_type)
8082
if type(config.sef_type_to_field_id[sef_type]) == int:
8183
query="select value,observation_date from data_entries_corrected_final_iso where field_id = {} order by observation_date asc".format(config.sef_type_to_field_id[sef_type])
8284
else:
@@ -94,7 +96,7 @@ def generateSEF(sef_type):
9496
"\t"+value+"\t|\t\n"
9597
type_result_set.append(result_str)
9698
except:
97-
print ("Couldn't generate SEF line for value="+str(value)+", observation date ="+str(observation_date))
99+
p.log ("Couldn't generate SEF line for value="+str(value)+", observation date ="+str(observation_date))
98100

99101

100102
(filename,index_start,index_end)=getFilename(sef_type, type_result_set)

DRAW-post-processing/setup_raw_data_table.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,6 @@ def set_up_raw_data_table(continue_flag):
1414
# TODO : update other field id's with their respective pp_id
1515

1616
tables.create_raw_data_table(continue_flag)
17+
tables.create_post_processing_reports_table()
18+
tables.create_outliers_stats_table()
19+

DRAW-post-processing/sql_commands.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def create_error_edit_table(phase):
6969
phase_1_data_test_sql = "SELECT * FROM data_entries_corrected_duplicateless_test;"
7070

7171
phase_2_data_sql = "select * from data_entries_corrected_final"
72+
outliers_stats_sql="select field_id,count(*) from data_entries_corrected_final where flagged=10 group by field_id"
7273

7374
# MySQL commands used during post-processing phases:
7475

0 commit comments

Comments
 (0)