1
+ '''
2
+ Dendra API Query
3
+
4
+ Author: Collin Bode
5
+ Date: 2019-05-12
6
+
7
+ Purpose:
8
+ Simplifies pulling data from the https://dendra.science time-series data management system.
9
+ Dendra API requires paging of records in sets of 2,000. This library performs
10
+ that function automatically.
11
+
12
+ Parameters:
13
+ query: a JSON object with the tags, organization, stations, and start/end times
14
+ endpoint: what API endpoint to query. 'datapoints/lookup' (default), 'station','datastream','datapoint'
15
+ interval: datalogger time between records, integer. 5 = ERCZO (default), 10 = UCNRS, 15 = USGS
16
+ '''
17
+
18
+ import requests
19
+ import pandas as pd
20
+ import datetime as dt
21
+ import pytz
22
+ from dateutil import tz
23
+ from dateutil .parser import parse
24
+
25
+ # Params
26
+ url = 'https://api.dendra.science/v1/'
27
+ headers = {"Content-Type" :"application/json" }
28
+
29
+ # Time Helper Functions
30
+ # These apply standardized formating and UTC conversion
31
+ def time_utc (str_time = "" ):
32
+ if (str_time == "" ):
33
+ dt_time = dt .datetime .now (pytz .utc )
34
+ else :
35
+ dt_time = parse (str_time )
36
+ if (dt_time .tzinfo != pytz .utc ):
37
+ dt_time = dt_time .astimezone (pytz .utc )
38
+ return dt_time
39
+
40
+ def time_format (dt_time = dt .datetime .now ()):
41
+ str_time = dt .datetime .strftime (dt_time ,"%Y-%m-%dT%H:%M:%S.%fZ" )
42
+ return str_time
43
+
44
+ # List Functions help find what you are looking for, do not retreive full metadata
45
+ def list_organizations (orgslug = 'all' ):
46
+ # options: 'erczo','ucnrs','chi'
47
+ query = {
48
+ '$sort[name]' : 1 ,
49
+ '$select[name]' :1 ,
50
+ '$select[slug]' :1
51
+ }
52
+ if (orgslug != 'all' ):
53
+ query ['slug' ] = orgslug
54
+
55
+ r = requests .get (url + 'organizations' , headers = headers , params = query )
56
+ assert r .status_code == 200
57
+ rjson = r .json ()
58
+ return rjson ['data' ]
59
+
60
+ def list_stations (orgslug = 'all' ,query_add = 'none' ):
61
+ # orgslug options: 'erczo','ucnrs','chi'
62
+ # NOTE: can either do all orgs or one org. No option to list some,
63
+ # unless you custom add to the query.
64
+ query = {
65
+ '$sort[name]' : 1 ,
66
+ '$select[name]' : 1 ,
67
+ '$select[slug]' : 1
68
+ }
69
+
70
+ # Narrow query to one organization
71
+ if (orgslug != 'all' ):
72
+ org_list = list_organizations (orgslug )
73
+ if (len (org_list ) == 0 ):
74
+ return 'ERROR: no organizations found with that acronym.'
75
+ orgid = org_list [0 ]['_id' ]
76
+ query ['organization_id' ] = orgid
77
+
78
+ # Modify query adding custom elements
79
+ if (query_add != 'none' ):
80
+ for element in query_add :
81
+ query [element ] = query_add [element ]
82
+
83
+ # Request JSON from Dendra
84
+ r = requests .get (url + 'stations' , headers = headers , params = query )
85
+ assert r .status_code == 200
86
+ rjson = r .json ()
87
+ return rjson ['data' ]
88
+
89
+ # GET Metadata returns full metadata
90
+ def get_datastream_by_id (datastream_id ,query_add = '' ):
91
+ query = { '_id' : datastream_id }
92
+ if (query_add != '' ):
93
+ query .update (query_add )
94
+ r = requests .get (url + 'datastreams' , headers = headers , params = query )
95
+ assert r .status_code == 200
96
+ rjson = r .json ()
97
+ return rjson ['data' ][0 ]
98
+
99
+
100
+ # GET Datapoints gets the good stuff, actual datavalues
101
+ def get_datapoints (datastream_id ,time_start ,time_end = time_format ()):
102
+ # Get Datapoints queries only one datastream.
103
+ # Returns a Pandas DataFrame columns[index,timezone_offset_sec,timestamp,<name of datastream>]
104
+ query = {
105
+ 'datastream_id' : datastream_id ,
106
+ 'time[$gt]' : time_start ,
107
+ 'time[$lt]' : time_end ,
108
+ '$sort[time]' : 1 ,
109
+ 'time_local' : 1 ,
110
+ '$limit' : 2000
111
+ }
112
+ r = requests .get (url + 'datapoints' , headers = headers , params = query )
113
+ assert r .status_code == 200
114
+ rjson = r .json ()
115
+ bigjson = rjson
116
+ while (len (rjson ['data' ]) > 0 ):
117
+ df = pd .DataFrame .from_records (bigjson ['data' ])
118
+ time_last = df ['t' ].max ()
119
+ query ['time[$gt]' ] = time_last
120
+ r = requests .get (url + 'datapoints' , headers = headers , params = query )
121
+ assert r .status_code == 200
122
+ rjson = r .json ()
123
+ bigjson ['data' ].extend (rjson ['data' ])
124
+ # Create Pandas DataFrame with data and set time as index
125
+ df = pd .DataFrame .from_records (bigjson ['data' ])
126
+ df .set_index (df .t , inplace = True )
127
+ # assign a human readable name to the data column
128
+ datastream_meta = get_datastream_by_id (datastream_id ,{'$select[name]' :1 })
129
+ datastream_name = datastream_meta ['name' ]
130
+ df .rename (columns = {'o' :'timezone_offset_sec' ,'t' :'timestamp_utc' ,'v' :datastream_name },inplace = True )
131
+ return df
132
+
133
+ # Lookup is an earlier attempt. Use get_datapoints unless you have to use this.
134
+ def __lookup_datapoints_subquery (bigjson ,query ,endpoint = 'datapoints/lookup' ):
135
+ r = requests .get (url + endpoint , headers = headers , params = query )
136
+ assert r .status_code == 200
137
+ rjson = r .json ()
138
+ if (len (bigjson ) == 0 ): # First pull assigns the metadata
139
+ bigjson = rjson
140
+ else : # all others just add to the datapoints
141
+ for i in range (0 ,len (bigjson )):
142
+ bigjson [i ]['datapoints' ]['data' ].extend (rjson [i ]['datapoints' ]['data' ])
143
+ return bigjson
144
+
145
+ def lookup_datapoints (query ,endpoint = 'datapoints/lookup' ,interval = 5 ):
146
+ # Determine start and end timestamps
147
+ # Start time
148
+ #time_start_original = dt.datetime.strptime(query['time[$gte]'],'%Y-%m-%dT%H:%M:%SZ')
149
+ time_start_original = parse (query ['time[$gte]' ])
150
+ #time_start_original = pytz.utc.localize(time_start_original)
151
+ # end time
152
+ if ('time[$lt]' in query ):
153
+ #time_end_original = dt.datetime.strptime(query['time[$lt]'],'%Y-%m-%dT%H:%M:%SZ')
154
+ time_end_original = parse (query ['time[$lt]' ])
155
+ #time_end_original = pytz.utc.localize(time_end_original)
156
+ else :
157
+ time_end_original_local = dt .datetime .now (tz .tzlocal ())
158
+ time_end_original = time_end_original_local .astimezone (pytz .utc )
159
+
160
+ # Paging limit: 2000 records.
161
+ interval2k = (dt .timedelta (minutes = interval ) * 2000 )
162
+
163
+ # Perform repeat queries until the time_end catches up with the target end date
164
+ time_start = time_start_original
165
+ time_end = time_start_original + interval2k
166
+ bigjson = {}
167
+ while (time_end < time_end_original and time_start < time_end_original ):
168
+ bigjson = __lookup_datapoints_subquery (bigjson ,query ,endpoint )
169
+ time_start = time_end
170
+ time_end = time_start + interval2k
171
+ # One final pull after loop for the under 2000 records left
172
+ bigjson = __lookup_datapoints_subquery (bigjson ,query ,endpoint )
173
+
174
+ # Count total records pulled and update limit metadata
175
+ max_records = pd .date_range (start = time_start_original ,end = time_end_original , tz = 'UTC' ,freq = str (interval )+ 'min' )
176
+ for i in range (0 ,len (bigjson )):
177
+ bigjson [i ]['datapoints' ]['limit' ] = len (max_records )
178
+
179
+ # return the full metadata and records
180
+ return bigjson
181
+
182
+
183
+ ###############################################################################
184
+ # Unit Tests
185
+ #
186
+ def __main ():
187
+ btime = False
188
+ borg = False
189
+ bstation = False
190
+ bdatastream_id = False
191
+ bdatapoints = False
192
+ bdatapoints_lookup = False
193
+
194
+ ####################
195
+ # Test Time
196
+ if (btime == True ):
197
+ # time_utc converts string to datetime
198
+ string_utc = '2019-03-01T08:00:00Z'
199
+ print ('UTC:' ,time_utc (string_utc ))
200
+ string_edt = '2019-03-01T08:00:00-0400'
201
+ print ('EDT:' ,time_utc (string_edt ))
202
+ string_hst = '2019-03-01T08:00:00HST'
203
+ print ('HST:' ,time_utc (string_hst ))
204
+ print ('Empty (local default):' ,time_utc ())
205
+
206
+ # time_format converts datetime to utc string
207
+ tu = dt .datetime .strptime (string_utc ,'%Y-%m-%dT%H:%M:%SZ' )
208
+ print ('time_format utc:' ,time_format (tu ))
209
+ te = dt .datetime .strptime (string_edt ,'%Y-%m-%dT%H:%M:%S%z' )
210
+ print ('time_format edt:' ,time_format (te ))
211
+ print ('time_format empty:' ,time_format ())
212
+
213
+
214
+ ####################
215
+ # Test Organizations
216
+ if (borg == True ):
217
+ # Get One Organization ID
218
+ erczo = list_organizations ('erczo' )
219
+ print ('Organizations ERCZO ID:' ,erczo [0 ]['_id' ])
220
+
221
+ # Get All Organization IDs
222
+ org_list = list_organizations ()
223
+ print ('All Organizations:' )
224
+ print ("ID\t \t \t Name" )
225
+ for org in org_list :
226
+ print (org ['_id' ],org ['name' ])
227
+
228
+ # Send a BAD Organization slug
229
+ orgs = list_organizations ('Trump_is_Evil' )
230
+ print ('BAD Organizations:' ,orgs )
231
+
232
+ ####################
233
+ # Test stations
234
+ if (bstation == True ):
235
+ # Get All stations
236
+ st_list = list_stations ()
237
+ print ('\n ALL Organization Stations\n ' ,st_list )
238
+
239
+ # Get Stations from UCNRS only
240
+ stslug = 'ucnrs'
241
+ st_list = list_stations (stslug )
242
+ #print(st_erczo)
243
+ print ('\n ' ,stslug .upper (),'Stations\n ' )
244
+ print ("ID\t \t \t Name\t \t Slug" )
245
+ for station in st_list :
246
+ print (station ['_id' ],station ['name' ],"\t " ,station ['slug' ])
247
+
248
+ # Modify Query
249
+ query_add = {'$select[station_type]' :1 }
250
+ print (query_add )
251
+ st_list = list_stations (stslug ) #,query_add)
252
+ print ('\n ' ,stslug .upper (),'Stations with station_type added\n ' ,st_list )
253
+
254
+ # What happens when you send a BAD organization string?
255
+ st_list = list_stations ('Trump is Evil' )
256
+ print ('\n BAD Organizations Stations\n ' ,st_list )
257
+
258
+ ####################
259
+ # Test Datastream from id
260
+ if (bdatastream_id == True ):
261
+ # Get all Metadata about one Datastream 'South Meadow WS, Air Temp C'
262
+ airtemp_id = '5ae8793efe27f424f9102b87'
263
+ airtemp_meta = get_datastream_by_id (airtemp_id )
264
+ print (airtemp_meta )
265
+
266
+ # Get only Name from Metadata using query_add
267
+ airtemp_meta = get_datastream_by_id (airtemp_id ,{'$select[name]' :1 })
268
+ print (airtemp_meta )
269
+
270
+ ####################
271
+ # Test Datapoints
272
+ if (bdatapoints == True ):
273
+ airtemp_id = '5ae8793efe27f424f9102b87'
274
+ from_time = '2019-02-01T08:00:00.000Z' # UTC, not local PST time
275
+ to_time = '2019-03-01T08:00:00Z'
276
+ #to_time = None
277
+ dd = get_datapoints (airtemp_id ,from_time ,to_time )
278
+ dups = dd [dd .duplicated (keep = False )]
279
+ print ('get_datapoints count:' ,len (dd ),'min date:' ,dd .index .min (),'max date:' ,dd .index .max ())
280
+ print ('duplicates?\n ' ,dups )
281
+
282
+ # No end date
283
+ to_time = None
284
+ dd = get_datapoints (airtemp_id ,from_time )
285
+ print ('get_datapoints end date set to now, count:' ,len (dd ),'min date:' ,dd .index .min (),'max date:' ,dd .index .max ())
286
+ print (dd )
287
+
288
+ ####################
289
+ # Test Datapoints Lookup
290
+ if (bdatapoints_lookup == True ):
291
+ # Parameters
292
+ orgid = '58db17c424dc720001671378' # ucnrs
293
+ station_id = '58e68cabdf5ce600012602b3'
294
+ from_time = '2019-04-01T08:00:00.000Z' # UTC, not local PST time
295
+ to_time = '2019-05-05T08:00:00Z'
296
+ interval = 10 # 5,10,15
297
+
298
+ tags = [
299
+ 'ds_Medium_Air' ,
300
+ 'ds_Variable_Temperature' ,
301
+ 'ds_Aggregate_Average'
302
+ ]
303
+ query = {
304
+ 'station_id' : station_id ,
305
+ 'time[$gte]' : from_time ,
306
+ 'tags' : '.' .join (tags ),
307
+ '$sort[time]' : 1 ,
308
+ 'time_local' : 1 ,
309
+ '$limit' : 2000
310
+ }
311
+ if ('to_time' in locals ()):
312
+ query ['time[$lt]' ] = to_time
313
+ #print(query)
314
+ # Test the Query
315
+ bigjson = lookup_datapoints (query ,'datapoints/lookup' ,interval )
316
+
317
+ # Show the results
318
+ for doc in bigjson :
319
+ print (doc ['name' ],len (doc ['datapoints' ]['data' ]),doc ['datapoints' ]['limit' ],doc ['_id' ])
320
+
321
+ if (__name__ == '__main__' ):
322
+ __main ()
0 commit comments