|
| 1 | +""" |
| 2 | +Module with tasks that provide data from meteoblue services |
| 3 | +
|
| 4 | +Copyright (c) 2017- Sinergise and contributors |
| 5 | +For the full list of contributors, see https://github.com/sentinel-hub/eo-learn/blob/master/CREDITS.md. |
| 6 | +
|
| 7 | +This source code is licensed under the MIT license, see the LICENSE file in the root directory of this source tree. |
| 8 | +""" |
| 9 | +from __future__ import annotations |
| 10 | + |
| 11 | +import datetime as dt |
| 12 | +from abc import ABCMeta, abstractmethod |
| 13 | +from typing import Any |
| 14 | + |
| 15 | +import dateutil.parser |
| 16 | +import geopandas as gpd |
| 17 | +import numpy as np |
| 18 | +import pandas as pd |
| 19 | +import shapely.geometry |
| 20 | + |
| 21 | +try: |
| 22 | + import meteoblue_dataset_sdk |
| 23 | + from meteoblue_dataset_sdk.caching import FileCache |
| 24 | +except ImportError as exception: |
| 25 | + raise ImportError("This module requires an installation of meteoblue_dataset_sdk package") from exception |
| 26 | + |
| 27 | + |
| 28 | +from eolearn.core import EOPatch, EOTask |
| 29 | +from eolearn.core.constants import TIMESTAMP_COLUMN |
| 30 | +from eolearn.core.types import Feature |
| 31 | +from sentinelhub import CRS, BBox, Geometry, parse_time_interval, serialize_time |
| 32 | +from sentinelhub.types import RawTimeIntervalType |
| 33 | + |
| 34 | + |
| 35 | +class BaseMeteoblueTask(EOTask, metaclass=ABCMeta): |
| 36 | + """A base task implementing the logic that is common for all meteoblue tasks""" |
| 37 | + |
| 38 | + def __init__( |
| 39 | + self, |
| 40 | + feature: Feature, |
| 41 | + apikey: str, |
| 42 | + query: dict | None = None, |
| 43 | + units: dict | None = None, |
| 44 | + time_difference: dt.timedelta = dt.timedelta(minutes=30), # noqa: B008, RUF100 |
| 45 | + cache_folder: str | None = None, |
| 46 | + cache_max_age: int = 604800, |
| 47 | + ): |
| 48 | + """ |
| 49 | + :param feature: A feature in which meteoblue data will be stored |
| 50 | + :param apikey: meteoblue API key |
| 51 | + :param query: meteoblue dataset API query definition. If set to None (default) the query has to be set |
| 52 | + in the execute method instead. |
| 53 | + :param units: meteoblue dataset API units definition. If set to None (default) request will use default units |
| 54 | + as specified in https://docs.meteoblue.com/en/weather-apis/dataset-api/dataset-api#units |
| 55 | + :param time_difference: The size of a time interval around each timestamp for which data will be collected. It |
| 56 | + is used only in a combination with ``time_interval`` parameter from ``execute`` method. |
| 57 | + :param cache_folder: Path to cache_folder. If set to None (default) requests will not be cached. |
| 58 | + :param cache_max_age: Maximum age in seconds to use a cached result. Default 1 week. |
| 59 | + """ |
| 60 | + self.feature = self.parse_feature(feature) |
| 61 | + cache = None |
| 62 | + if cache_folder: |
| 63 | + cache = FileCache(path=cache_folder, max_age=cache_max_age) |
| 64 | + |
| 65 | + self.client = meteoblue_dataset_sdk.Client(apikey=apikey, cache=cache) |
| 66 | + self.query = query |
| 67 | + self.units = units |
| 68 | + self.time_difference = time_difference |
| 69 | + |
| 70 | + @staticmethod |
| 71 | + def _get_modified_eopatch(eopatch: EOPatch | None, bbox: BBox | None) -> tuple[EOPatch, BBox]: |
| 72 | + if bbox is not None: |
| 73 | + if eopatch is None: |
| 74 | + eopatch = EOPatch(bbox=bbox) |
| 75 | + elif eopatch.bbox is None: |
| 76 | + eopatch.bbox = bbox |
| 77 | + elif eopatch.bbox != bbox: |
| 78 | + raise ValueError("Provided eopatch.bbox and bbox are not the same") |
| 79 | + return eopatch, bbox |
| 80 | + |
| 81 | + if eopatch is None or eopatch.bbox is None: |
| 82 | + raise ValueError("Bounding box is not provided") |
| 83 | + return eopatch, eopatch.bbox |
| 84 | + |
| 85 | + def _prepare_time_intervals(self, eopatch: EOPatch, time_interval: RawTimeIntervalType | None) -> list[str]: |
| 86 | + """Prepare a list of time intervals for which data will be collected from meteoblue services""" |
| 87 | + timestamps = eopatch.timestamps |
| 88 | + |
| 89 | + if timestamps is None: |
| 90 | + if not time_interval: |
| 91 | + raise ValueError( |
| 92 | + "Time interval should either be defined with `eopatch.timestamps` or `time_interval` parameter" |
| 93 | + ) |
| 94 | + |
| 95 | + serialized_start_time, serialized_end_time = serialize_time(parse_time_interval(time_interval)) |
| 96 | + return [f"{serialized_start_time}/{serialized_end_time}"] |
| 97 | + |
| 98 | + time_intervals: list[str] = [] |
| 99 | + for timestamp in timestamps: |
| 100 | + start_time = timestamp - self.time_difference |
| 101 | + end_time = timestamp + self.time_difference |
| 102 | + |
| 103 | + serizalized_start_time, serizalized_end_time = serialize_time((start_time, end_time)) |
| 104 | + |
| 105 | + time_intervals.append(f"{serizalized_start_time}/{serizalized_end_time}") |
| 106 | + |
| 107 | + return time_intervals |
| 108 | + |
| 109 | + @abstractmethod |
| 110 | + def _get_data(self, query: dict) -> tuple[Any, list[dt.datetime]]: |
| 111 | + """It should return an output feature object and a list of timestamps""" |
| 112 | + |
| 113 | + def execute( |
| 114 | + self, |
| 115 | + eopatch: EOPatch | None = None, |
| 116 | + *, |
| 117 | + query: dict | None = None, |
| 118 | + bbox: BBox | None = None, |
| 119 | + time_interval: RawTimeIntervalType | None = None, |
| 120 | + ) -> EOPatch: |
| 121 | + """Execute method that adds new meteoblue data into an EOPatch |
| 122 | +
|
| 123 | + :param eopatch: An EOPatch in which data will be added. If not provided a new EOPatch will be created. |
| 124 | + :param bbox: A bounding box of a request. Should be provided if eopatch parameter is not provided. |
| 125 | + :param query: meteoblue dataset API query definition. This query takes precedence over one defined in __init__. |
| 126 | + :param time_interval: An interval for which data should be downloaded. If not provided then timestamps from |
| 127 | + provided eopatch will be used. |
| 128 | + :raises ValueError: Raises an exception when no query is set during Task initialization or the execute method. |
| 129 | + """ |
| 130 | + eopatch, bbox = self._get_modified_eopatch(eopatch, bbox) |
| 131 | + |
| 132 | + time_intervals = self._prepare_time_intervals(eopatch, time_interval) |
| 133 | + |
| 134 | + geometry = Geometry(bbox.geometry, bbox.crs).transform(CRS.WGS84) |
| 135 | + geojson = shapely.geometry.mapping(geometry.geometry) |
| 136 | + |
| 137 | + query = query if query is not None else self.query |
| 138 | + if query is None: |
| 139 | + raise ValueError("Query has to specified in execute method or during task initialization") |
| 140 | + |
| 141 | + executable_query = { |
| 142 | + "units": self.units, |
| 143 | + "geometry": geojson, |
| 144 | + "format": "protobuf", |
| 145 | + "timeIntervals": time_intervals, |
| 146 | + "queries": [query], |
| 147 | + } |
| 148 | + result_data, result_timestamps = self._get_data(executable_query) |
| 149 | + |
| 150 | + if not eopatch.timestamps and result_timestamps: |
| 151 | + eopatch.timestamps = result_timestamps |
| 152 | + |
| 153 | + eopatch[self.feature] = result_data |
| 154 | + return eopatch |
| 155 | + |
| 156 | + |
| 157 | +class MeteoblueVectorTask(BaseMeteoblueTask): |
| 158 | + """Obtains weather data from meteoblue services as a vector feature |
| 159 | +
|
| 160 | + The data is obtained as a VECTOR feature in a ``geopandas.GeoDataFrame`` where columns include latitude, longitude, |
| 161 | + timestamp and a column for each weather variable. All data is downloaded from the |
| 162 | + meteoblue dataset API (<https://docs.meteoblue.com/en/weather-apis/dataset-api/dataset-api>). |
| 163 | +
|
| 164 | + A meteoblue API key is required to retrieve data. |
| 165 | + """ |
| 166 | + |
| 167 | + def _get_data(self, query: dict) -> tuple[gpd.GeoDataFrame, list[dt.datetime]]: |
| 168 | + """Provides a GeoDataFrame with information about weather control points and an empty list of timestamps""" |
| 169 | + result = self.client.querySync(query) |
| 170 | + dataframe = meteoblue_to_dataframe(result) |
| 171 | + geometry = gpd.points_from_xy(dataframe.Longitude, dataframe.Latitude) |
| 172 | + crs = CRS.WGS84.pyproj_crs() |
| 173 | + gdf = gpd.GeoDataFrame(dataframe, geometry=geometry, crs=crs) |
| 174 | + return gdf, [] |
| 175 | + |
| 176 | + |
| 177 | +class MeteoblueRasterTask(BaseMeteoblueTask): |
| 178 | + """Obtains weather data from meteoblue services as a raster feature |
| 179 | +
|
| 180 | + It returns a 4D numpy array with dimensions (time, height, width, weather variables) which should be stored as a |
| 181 | + DATA feature. Data is resampled to WGS84 plate carrée to a specified resolution using the |
| 182 | + meteoblue dataset API (<https://docs.meteoblue.com/en/weather-apis/dataset-api/dataset-api>). |
| 183 | +
|
| 184 | + A meteoblue API key is required to retrieve data. |
| 185 | + """ |
| 186 | + |
| 187 | + def _get_data(self, query: dict) -> tuple[np.ndarray, list[dt.datetime]]: |
| 188 | + """Return a 4-dimensional numpy array of shape (time, height, width, weather variables) and a list of |
| 189 | + timestamps |
| 190 | + """ |
| 191 | + result = self.client.querySync(query) |
| 192 | + |
| 193 | + data = meteoblue_to_numpy(result) |
| 194 | + timestamps = _meteoblue_timestamps_from_geometry(result.geometries[0]) |
| 195 | + return data, timestamps |
| 196 | + |
| 197 | + |
| 198 | +def meteoblue_to_dataframe(result: Any) -> pd.DataFrame: |
| 199 | + """Transform a meteoblue dataset API result to a dataframe |
| 200 | +
|
| 201 | + :param result: A response of meteoblue API of type `Dataset_pb2.DatasetApiProtobuf` |
| 202 | + :returns: A dataframe with columns TIMESTAMP, Longitude, Latitude and aggregation columns |
| 203 | + """ |
| 204 | + geometry = result.geometries[0] |
| 205 | + code_names = [f"{code.code}_{code.level}_{code.aggregation}" for code in geometry.codes] |
| 206 | + |
| 207 | + if not geometry.timeIntervals: |
| 208 | + return pd.DataFrame(columns=[TIMESTAMP_COLUMN, "Longitude", "Latitude", *code_names]) |
| 209 | + |
| 210 | + dataframes = [] |
| 211 | + for index, time_interval in enumerate(geometry.timeIntervals): |
| 212 | + timestamps = _meteoblue_timestamps_from_time_interval(time_interval) |
| 213 | + |
| 214 | + n_locations = len(geometry.lats) |
| 215 | + n_timesteps = len(timestamps) |
| 216 | + |
| 217 | + dataframe = pd.DataFrame( |
| 218 | + { |
| 219 | + TIMESTAMP_COLUMN: np.tile(timestamps, n_locations), # type: ignore[arg-type] # numpy can do this |
| 220 | + "Longitude": np.repeat(geometry.lons, n_timesteps), |
| 221 | + "Latitude": np.repeat(geometry.lats, n_timesteps), |
| 222 | + } |
| 223 | + ) |
| 224 | + |
| 225 | + for code, code_name in zip(geometry.codes, code_names): |
| 226 | + dataframe[code_name] = np.array(code.timeIntervals[index].data) |
| 227 | + |
| 228 | + dataframes.append(dataframe) |
| 229 | + |
| 230 | + return pd.concat(dataframes, ignore_index=True) |
| 231 | + |
| 232 | + |
| 233 | +def meteoblue_to_numpy(result: Any) -> np.ndarray: |
| 234 | + """Transform a meteoblue dataset API result to a dataframe |
| 235 | +
|
| 236 | + :param result: A response of meteoblue API of type `Dataset_pb2.DatasetApiProtobuf` |
| 237 | + :returns: A 4D numpy array with shape (time, height, width, weather variables) |
| 238 | + """ |
| 239 | + geometry = result.geometries[0] |
| 240 | + |
| 241 | + n_locations = len(geometry.lats) |
| 242 | + n_codes = len(geometry.codes) |
| 243 | + n_time_intervals = len(geometry.timeIntervals) |
| 244 | + geo_ny = geometry.ny |
| 245 | + geo_nx = geometry.nx |
| 246 | + |
| 247 | + # meteoblue data is using dimensions (n_variables, n_time_intervals, ny, nx, n_timesteps) |
| 248 | + # Individual time intervals may have different number of timesteps (not a dimension) |
| 249 | + # Therefore we have to first transpose each code individually and then transpose everything again |
| 250 | + def map_code(code: Any) -> np.ndarray: |
| 251 | + """Transpose a single code""" |
| 252 | + code_data = np.array([t.data for t in code.timeIntervals]) |
| 253 | + |
| 254 | + code_n_timesteps = code_data.size // n_locations // n_time_intervals |
| 255 | + code_data = code_data.reshape((n_time_intervals, geo_ny, geo_nx, code_n_timesteps)) |
| 256 | + |
| 257 | + # transpose from shape (n_time_intervals, geo_ny, geo_nx, code_n_timesteps) |
| 258 | + # to (n_time_intervals, code_n_timesteps, geo_ny, geo_nx) |
| 259 | + # and flip the y-axis (meteoblue is using northward facing axis |
| 260 | + # but the standard for EOPatch features is a southward facing axis) |
| 261 | + return np.flip(code_data.transpose((0, 3, 1, 2)), axis=2) |
| 262 | + |
| 263 | + data = np.array(list(map(map_code, geometry.codes))) |
| 264 | + |
| 265 | + n_timesteps = data.size // n_locations // n_codes |
| 266 | + data = data.reshape((n_codes, n_timesteps, geo_ny, geo_nx)) |
| 267 | + |
| 268 | + return data.transpose((1, 2, 3, 0)) |
| 269 | + |
| 270 | + |
| 271 | +def _meteoblue_timestamps_from_geometry(geometry_pb: Any) -> list[dt.datetime]: |
| 272 | + """Transforms a protobuf geometry object into a list of datetime objects""" |
| 273 | + return list(pd.core.common.flatten(map(_meteoblue_timestamps_from_time_interval, geometry_pb.timeIntervals))) |
| 274 | + |
| 275 | + |
| 276 | +def _meteoblue_timestamps_from_time_interval(timestamp_pb: Any) -> list[dt.datetime]: |
| 277 | + """Transforms a protobuf timestamp object into a list of datetime objects""" |
| 278 | + if timestamp_pb.timestrings: |
| 279 | + # Time intervals like weekly data, return an `array of strings` as timestamps |
| 280 | + # For time indications like `20200801T0000-20200802T235959` we only return the first date as datetime |
| 281 | + return list(map(_parse_timestring, timestamp_pb.timestrings)) |
| 282 | + |
| 283 | + # Regular time intervals return `start, end and stride` as a time axis |
| 284 | + # We convert it into an array of daytime |
| 285 | + time_range = range(timestamp_pb.start, timestamp_pb.end, timestamp_pb.stride) |
| 286 | + return list(map(dt.datetime.fromtimestamp, time_range)) |
| 287 | + |
| 288 | + |
| 289 | +def _parse_timestring(timestring: str) -> dt.datetime: |
| 290 | + """A helper method to parse specific timestrings obtained from meteoblue service""" |
| 291 | + if "-" in timestring: |
| 292 | + timestring = timestring.split("-")[0] |
| 293 | + return dateutil.parser.parse(timestring) |
0 commit comments