Skip to content

Commit

Permalink
Add "sensor_first_date" and "sensor_last_date" fields for IRCELINE with
Browse files Browse the repository at this point in the history
associated synthesized field "is_active", indicating a date freshness
of 7 days or less.
  • Loading branch information
amotl committed May 20, 2019
1 parent 8cebfe0 commit 0c3ec97
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ Luftdatenpumpe changelog

in progress
===========


2019-05-20 0.13.0
=================
- Improve IRCELINE request handling robustness
- Add "sensor_first_date" and "sensor_last_date" fields for IRCELINE
to indicate <= 7 days of data freshness by synthesized field "is_active".


2019-05-19 0.12.1
Expand Down
44 changes: 40 additions & 4 deletions luftdatenpumpe/source/irceline.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ def get_stations(self):
data = self.send_request('stations', params={'expanded': 'true'})
#import sys, json; print(json.dumps(data, indent=2)); sys.exit(0)

timeseries_index = self.get_timeseries_index()
#import sys, json; print(json.dumps(timeseries_index, indent=2)); sys.exit(0)

# Apply data filter.
data = self.apply_filter(data)

stations = []
for upstream_station in self.wrap_progress(data):
upstream_station = munchify(upstream_station)
#print('upstream_station:', upstream_station)
station_info = munchify({
'station_id': upstream_station.properties.id,
'station_label': upstream_station.properties.label,
Expand All @@ -94,8 +98,15 @@ def get_stations(self):
self.enrich_station(station_info)

station_info.sensors = self.timeseries_to_sensors(upstream_station.properties.timeseries)
#print(station_info)

# Enrich sensor information by timestamps of first / most recent reading.
for sensor in station_info.sensors:
timeseries_id = sensor['sensor_id']
if timeseries_id in timeseries_index:
sensor['sensor_first_date'] = self.convert_timestamp(timeseries_index[timeseries_id].firstValue.timestamp)
sensor['sensor_last_date'] = self.convert_timestamp(timeseries_index[timeseries_id].lastValue.timestamp)

#print(station_info)
stations.append(station_info)

# List of stations sorted by station identifier.
Expand Down Expand Up @@ -305,7 +316,7 @@ def get_readings_from_api(self):
#timeseries = self.get_timeseries(timeseries_ids=[1180, 6895], timespan=self.filter.get('timespan'))

# For real
timeseries = self.get_timeseries(timeseries_ids=timeseries_id_list, timespan=self.filter.get('timespan'))
timeseries = self.get_timeseries_details(timeseries_ids=timeseries_id_list, timespan=self.filter.get('timespan'))

# Map timeseries to readings.
timeseries_readings_map = {}
Expand Down Expand Up @@ -362,7 +373,22 @@ def get_readings_from_api(self):

return items

def get_timeseries(self, timeseries_ids=None, timespan=None):
def get_timeseries_index(self, timespan=None):
if timespan is None:
timespan = f'PT12h/{self.this_hour()}'

url = urljoin(self.uri, f'timeseries/')
data = self.send_request(url, params={'timespan': timespan, 'expanded': 'true'})
#print(data)

items = {}
for entry in data:
key = int(entry['id'])
items[key] = munchify(entry)

return items

def get_timeseries_details(self, timeseries_ids=None, timespan=None):
"""
- http://geo.irceline.be/sos/static/doc/api-doc/#timeseries-example-post-data
- https://wiki.52north.org/SensorWeb/SensorWebClientRESTInterfaceV0#Timeseries_Data
Expand All @@ -375,6 +401,11 @@ def get_timeseries(self, timeseries_ids=None, timespan=None):
- P0Y0M3D/2013-01-31TZ
Examples
Data from specified station and timespan::
http http://geo.irceline.be/sos/api/v1/timeseries/6151/getData?timespan=PT6H/2019-05-18T09:00:00Z
- Three hours worth of data from designated starting point for a specific timeseries::
http POST 'http://geo.irceline.be/sos/api/v1/timeseries/getData' timeseries:='[6643]' timespan='2019-04-21T22:00:00+02:00/PT3h'
Expand All @@ -399,8 +430,10 @@ def get_timeseries(self, timeseries_ids=None, timespan=None):
results.update(data)
except KeyboardInterrupt:
raise
except HTTPError as ex:
log.error(f'Requesting data for timeseries {identifier} failed: {ex}')
except:
log.exception(f'Decoding response from IRCELINE failed')
log.exception(f'Decoding response for timeseries {identifier} failed')

return results

Expand Down Expand Up @@ -463,6 +496,9 @@ def reading_data_from_timeseries(self, name, values):

@staticmethod
def convert_timestamp(timestamp):
"""
SOS timestamp are milliseconds epoch.
"""
datetime_object = datetime.fromtimestamp(timestamp / 1000)
return rfc3339(datetime_object)

Expand Down
12 changes: 12 additions & 0 deletions luftdatenpumpe/target/rdbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ def ensure_schema(self):
stations_table.create_column('longitude', self.db.types.float)
stations_table.create_column('altitude', self.db.types.float)

# Enforce "xox_sensors.(sensor_first_date,sensor_last_date)" to be of "datetime" type.
stations_table = self.db.get_table(f'{self.realm}_sensors')
stations_table.create_column('sensor_first_date', self.db.types.datetime)
stations_table.create_column('sensor_last_date', self.db.types.datetime)

# Enforce "xox_osmdata.osm_id" and "xox_osmdata.osm_place_id" to be of "bigint" type.
# Otherwise: ``psycopg2.DataError: integer out of range`` with 'osm_id': 2678458514
osmdata_table = self.db.get_table(f'{self.realm}_osmdata')
Expand Down Expand Up @@ -263,16 +268,23 @@ def create_views(self):
DROP VIEW IF EXISTS {prefix}_network;
CREATE VIEW {prefix}_network AS
SELECT
-- Baseline fields.
{prefix}_stations.station_id,
{prefix}_stations.name, {prefix}_stations.country,
{prefix}_stations.longitude, {prefix}_stations.latitude, {prefix}_stations.altitude,
{prefix}_stations.geohash, {prefix}_stations.geopoint,
{prefix}_sensors.sensor_id, {prefix}_sensors.sensor_type_id, {prefix}_sensors.sensor_type_name,
-- Synthesized fields.
concat({prefix}_osmdata.osm_state, ' » ', {prefix}_osmdata.osm_city) AS state_and_city,
concat({prefix}_stations.name, ' (#', CAST({prefix}_stations.station_id AS text), ')') AS name_and_id,
concat({prefix}_osmdata.osm_country, ' (', {prefix}_osmdata.osm_country_code, ')') AS country_and_countrycode,
concat(concat_ws(', ', {prefix}_osmdata.osm_state, {prefix}_osmdata.osm_country), ' (', {prefix}_osmdata.osm_country_code, ')') AS state_and_country,
concat(concat_ws(', ', {prefix}_osmdata.osm_city, {prefix}_osmdata.osm_state, {prefix}_osmdata.osm_country), ' (', {prefix}_osmdata.osm_country_code, ')') AS city_and_state_and_country,
ABS(DATE_PART('day', sensor_last_date - now())) >= 7 AS is_active,
-- Add OSM fields.
{osmdata_columns_expression}
FROM
{prefix}_stations, {prefix}_osmdata, {prefix}_sensors
Expand Down

0 comments on commit 0c3ec97

Please sign in to comment.