From 0c3ec97969bc303fc939bcf6e2d7d848e57b771f Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 20 May 2019 23:14:35 +0200 Subject: [PATCH] Add "sensor_first_date" and "sensor_last_date" fields for IRCELINE with associated synthesized field "is_active", indicating a date freshness of 7 days or less. --- CHANGES.rst | 6 +++++ luftdatenpumpe/source/irceline.py | 44 ++++++++++++++++++++++++++++--- luftdatenpumpe/target/rdbms.py | 12 +++++++++ 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 6826f28..138b668 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,7 +5,13 @@ Luftdatenpumpe changelog in progress =========== + + +2019-05-20 0.13.0 +================= - Improve IRCELINE request handling robustness +- Add "sensor_first_date" and "sensor_last_date" fields for IRCELINE + to indicate <= 7 days of data freshness by synthesized field "is_active". 2019-05-19 0.12.1 diff --git a/luftdatenpumpe/source/irceline.py b/luftdatenpumpe/source/irceline.py index 37317ef..5dc5a5a 100644 --- a/luftdatenpumpe/source/irceline.py +++ b/luftdatenpumpe/source/irceline.py @@ -75,12 +75,16 @@ def get_stations(self): data = self.send_request('stations', params={'expanded': 'true'}) #import sys, json; print(json.dumps(data, indent=2)); sys.exit(0) + timeseries_index = self.get_timeseries_index() + #import sys, json; print(json.dumps(timeseries_index, indent=2)); sys.exit(0) + # Apply data filter. data = self.apply_filter(data) stations = [] for upstream_station in self.wrap_progress(data): upstream_station = munchify(upstream_station) + #print('upstream_station:', upstream_station) station_info = munchify({ 'station_id': upstream_station.properties.id, 'station_label': upstream_station.properties.label, @@ -94,8 +98,15 @@ def get_stations(self): self.enrich_station(station_info) station_info.sensors = self.timeseries_to_sensors(upstream_station.properties.timeseries) - #print(station_info) + # Enrich sensor information by timestamps of first / most recent reading. + for sensor in station_info.sensors: + timeseries_id = sensor['sensor_id'] + if timeseries_id in timeseries_index: + sensor['sensor_first_date'] = self.convert_timestamp(timeseries_index[timeseries_id].firstValue.timestamp) + sensor['sensor_last_date'] = self.convert_timestamp(timeseries_index[timeseries_id].lastValue.timestamp) + + #print(station_info) stations.append(station_info) # List of stations sorted by station identifier. @@ -305,7 +316,7 @@ def get_readings_from_api(self): #timeseries = self.get_timeseries(timeseries_ids=[1180, 6895], timespan=self.filter.get('timespan')) # For real - timeseries = self.get_timeseries(timeseries_ids=timeseries_id_list, timespan=self.filter.get('timespan')) + timeseries = self.get_timeseries_details(timeseries_ids=timeseries_id_list, timespan=self.filter.get('timespan')) # Map timeseries to readings. timeseries_readings_map = {} @@ -362,7 +373,22 @@ def get_readings_from_api(self): return items - def get_timeseries(self, timeseries_ids=None, timespan=None): + def get_timeseries_index(self, timespan=None): + if timespan is None: + timespan = f'PT12h/{self.this_hour()}' + + url = urljoin(self.uri, f'timeseries/') + data = self.send_request(url, params={'timespan': timespan, 'expanded': 'true'}) + #print(data) + + items = {} + for entry in data: + key = int(entry['id']) + items[key] = munchify(entry) + + return items + + def get_timeseries_details(self, timeseries_ids=None, timespan=None): """ - http://geo.irceline.be/sos/static/doc/api-doc/#timeseries-example-post-data - https://wiki.52north.org/SensorWeb/SensorWebClientRESTInterfaceV0#Timeseries_Data @@ -375,6 +401,11 @@ def get_timeseries(self, timeseries_ids=None, timespan=None): - P0Y0M3D/2013-01-31TZ Examples + + Data from specified station and timespan:: + + http http://geo.irceline.be/sos/api/v1/timeseries/6151/getData?timespan=PT6H/2019-05-18T09:00:00Z + - Three hours worth of data from designated starting point for a specific timeseries:: http POST 'http://geo.irceline.be/sos/api/v1/timeseries/getData' timeseries:='[6643]' timespan='2019-04-21T22:00:00+02:00/PT3h' @@ -399,8 +430,10 @@ def get_timeseries(self, timeseries_ids=None, timespan=None): results.update(data) except KeyboardInterrupt: raise + except HTTPError as ex: + log.error(f'Requesting data for timeseries {identifier} failed: {ex}') except: - log.exception(f'Decoding response from IRCELINE failed') + log.exception(f'Decoding response for timeseries {identifier} failed') return results @@ -463,6 +496,9 @@ def reading_data_from_timeseries(self, name, values): @staticmethod def convert_timestamp(timestamp): + """ + SOS timestamp are milliseconds epoch. + """ datetime_object = datetime.fromtimestamp(timestamp / 1000) return rfc3339(datetime_object) diff --git a/luftdatenpumpe/target/rdbms.py b/luftdatenpumpe/target/rdbms.py index a118f31..de8a0fc 100644 --- a/luftdatenpumpe/target/rdbms.py +++ b/luftdatenpumpe/target/rdbms.py @@ -114,6 +114,11 @@ def ensure_schema(self): stations_table.create_column('longitude', self.db.types.float) stations_table.create_column('altitude', self.db.types.float) + # Enforce "xox_sensors.(sensor_first_date,sensor_last_date)" to be of "datetime" type. + stations_table = self.db.get_table(f'{self.realm}_sensors') + stations_table.create_column('sensor_first_date', self.db.types.datetime) + stations_table.create_column('sensor_last_date', self.db.types.datetime) + # Enforce "xox_osmdata.osm_id" and "xox_osmdata.osm_place_id" to be of "bigint" type. # Otherwise: ``psycopg2.DataError: integer out of range`` with 'osm_id': 2678458514 osmdata_table = self.db.get_table(f'{self.realm}_osmdata') @@ -263,16 +268,23 @@ def create_views(self): DROP VIEW IF EXISTS {prefix}_network; CREATE VIEW {prefix}_network AS SELECT + + -- Baseline fields. {prefix}_stations.station_id, {prefix}_stations.name, {prefix}_stations.country, {prefix}_stations.longitude, {prefix}_stations.latitude, {prefix}_stations.altitude, {prefix}_stations.geohash, {prefix}_stations.geopoint, {prefix}_sensors.sensor_id, {prefix}_sensors.sensor_type_id, {prefix}_sensors.sensor_type_name, + + -- Synthesized fields. concat({prefix}_osmdata.osm_state, ' ยป ', {prefix}_osmdata.osm_city) AS state_and_city, concat({prefix}_stations.name, ' (#', CAST({prefix}_stations.station_id AS text), ')') AS name_and_id, concat({prefix}_osmdata.osm_country, ' (', {prefix}_osmdata.osm_country_code, ')') AS country_and_countrycode, concat(concat_ws(', ', {prefix}_osmdata.osm_state, {prefix}_osmdata.osm_country), ' (', {prefix}_osmdata.osm_country_code, ')') AS state_and_country, concat(concat_ws(', ', {prefix}_osmdata.osm_city, {prefix}_osmdata.osm_state, {prefix}_osmdata.osm_country), ' (', {prefix}_osmdata.osm_country_code, ')') AS city_and_state_and_country, + ABS(DATE_PART('day', sensor_last_date - now())) >= 7 AS is_active, + + -- Add OSM fields. {osmdata_columns_expression} FROM {prefix}_stations, {prefix}_osmdata, {prefix}_sensors