-
Notifications
You must be signed in to change notification settings - Fork 4
/
BatchItineraryAssignment.py
179 lines (133 loc) · 7.07 KB
/
BatchItineraryAssignment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import logging
import time
import yaml
from utils import postgres as pg_utils
with open('../config.yml', 'r') as yaml_config_file:
config = yaml.load(yaml_config_file)
# log_formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")
FORMAT = '%(asctime)-15s %(levelname)s: %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)
db_hostname = config['database']['hostname']
db_port = config['database']['port']
db_name = config['database']['dbname']
db_user = config['database']['user']
db_pwd = config['database']['pwd']
ITINERARY_MAX_TIME_DIFF_SECONDS = int(config['itinerarymaxtimediffseconds'])
dbconn = pg_utils.database_connection(dbname=db_name,
dbhost=db_hostname,
dbport=db_port,
dbuser=db_user,
dbpasswd=db_pwd)
def get_all_unique_mode_s_without_itin_assigned():
"""
Queries the database to find all of the unqiue mode_s_hex codes that have at least 1 record without an itinerary ID
assigned (null)
:return: list of Mode S Hex IDs (strings) that have at least 1 record without an itinerary ID assigned
"""
logger.info('Fetching a list of all Mode-s hex codes that are missing at least 1 itinerary ID.')
uniq_mode_s_cursor = dbconn.cursor()
sql = '''SELECT
DISTINCT aircraftreports.mode_s_hex
FROM aircraftreports
WHERE aircraftreports.itinerary_id IS NULL'''
uniq_mode_s_cursor.execute(sql)
return [record[0] for record in uniq_mode_s_cursor.fetchall()]
def assign_itinerary_id_for_mode_s(mode_s_hex_for_update, itinerary_id, min_time, max_time):
"""
Given a mode s hex code, itinerary id, and 2 epoch timestamps, assign the itinerary ID to the appropriate rows
:param mode_s_hex_for_update: Mode-s hex code (str)
:param itinerary_id: itinerary ID (str)
:param min_time: epoch timestamp, minimum timestamp
:param max_time: epoch timestamp, maximum timestamp
"""
min_timestamp = time.strftime('%Y/%m/%d %H:%M:%S', time.localtime(min_time))
max_timestamp = time.strftime('%Y/%m/%d %H:%M:%S', time.localtime(max_time))
diff_seconds = max_time - min_time
min, sec = divmod(diff_seconds, 60)
hr, min = divmod(min, 60)
logger.info(
'Assigning Itinerary ID {} for Mode S Hex {} between times {} and {}, elapsed time of {}:{} '.format(
itinerary_id,
mode_s_hex_for_update,
min_timestamp,
max_timestamp,
hr,
min))
itinerary_cursor = dbconn.cursor()
sql = "UPDATE aircraftreports SET itinerary_id = '{0}' WHERE aircraftreports.mode_s_hex = '{1}' " \
"AND aircraftreports.report_epoch BETWEEN {2} AND {3} ".format(itinerary_id,
mode_s_hex_for_update,
min_time,
max_time)
logger.debug('Assigning Itinerary ID with sql: {}'.format(sql))
itinerary_cursor.execute(sql)
# commit the query for each of the itinerary assignments as we loop through them
dbconn.commit()
itinerary_cursor.close()
def calc_time_diffs_for_mode_s(mode_s_hex):
"""
Given an input of a string mode_s_hex code, query the DB for all records with that mode_s_hex and loop through
the records in order of timestamp, comparing each pair of records to determine the amount of time between
a pair of time-ordered points. If the 2 points are far apart in time, it is assumed that the aircraft landed
and took back off while the large gap of time occurred.
Note: this doesn't assign an ID for all records (eg. the most recent batch), because the data could be in the
middle of an itinerary when this script is run.
:param mode_s_hex: the hex code identifying the aircraft
:type mode_s_hex: str
"""
logger.info('Calcing Time Diffs to assign itinerary ids for mode s: {}'.format(mode_s_hex))
uniq_mode_s_cursor = dbconn.cursor()
sql = '''SELECT aircraftreports.report_epoch, aircraftreports.report_epoch - lag(aircraftreports.report_epoch)
OVER (ORDER BY aircraftreports.report_epoch)
AS time_delta_sec
FROM aircraftreports
WHERE aircraftreports.itinerary_id IS NULL AND aircraftreports.mode_s_hex = '{}'
ORDER BY aircraftreports.report_epoch'''.format(mode_s_hex)
uniq_mode_s_cursor.execute(sql)
count = 0
for time_diff_tuple in uniq_mode_s_cursor.fetchall():
curr_timestamp = time_diff_tuple[0]
#logger.info('Current timestamp: {}'.format(time.strftime('%Y/%m/%d %H:%M:%S', time.localtime(curr_timestamp))))
# time.sleep(0.1)
# Time difference between the current record and the previous record
time_diff_sec = time_diff_tuple[1]
if count == 0:
minimum_timestamp = curr_timestamp
count += 1
continue
if time_diff_sec > ITINERARY_MAX_TIME_DIFF_SECONDS:
# Dynamically figure out what the previous timestamp was, so use it as the end of the itinerary
# This effectively removes the current timestamp from being used in any calculations, which is ok for
# this use-case
maximum_timestamp = curr_timestamp - time_diff_sec
assign_itinerary_id_for_mode_s(itinerary_id=generate_itinerary_id(mode_s_hex, minimum_timestamp),
mode_s_hex_for_update=mode_s_hex,
min_time=minimum_timestamp,
max_time=maximum_timestamp)
# time.sleep(10)
count = 0
else:
count += 1
def generate_itinerary_id(mode_s, epoch_timestamp):
"""
Using the mode-s hex code and the minimum epoch timestamp, create a new string that will be used as the
unique itinerary ID
:param mode_s: mode-s hex code
:type mode_s: str
:param epoch_timestamp: minimum timestamp that starts the itinerary
:type epoch_timestamp: int
:return: the itinerary ID (str)
"""
timestamp = time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime(epoch_timestamp))
itinerary_id_generated = timestamp + '_{}'.format(mode_s)
return itinerary_id_generated
mode_s_list_to_process = get_all_unique_mode_s_without_itin_assigned()
num_to_process = len(mode_s_list_to_process)
mode_s_count = 0
for mode_s in mode_s_list_to_process:
mode_s_count += 1
logger.info('Calcing Itinerary IDs for Mode S: {} - Progress on whole dataset (Processed/Total): {}/{} Mode S IDs'.format(mode_s,
mode_s_count,
num_to_process))
calc_time_diffs_for_mode_s(mode_s)