diff --git a/ecoscope/io/async_earthranger.py b/ecoscope/io/async_earthranger.py index 3b30db16..49d66397 100644 --- a/ecoscope/io/async_earthranger.py +++ b/ecoscope/io/async_earthranger.py @@ -2,11 +2,10 @@ import geopandas as gpd import pandas as pd import asyncio -from dateutil import parser import ecoscope from ecoscope.io.utils import to_hex -from ecoscope.io.earthranger_utils import clean_kwargs, to_gdf +from ecoscope.io.earthranger_utils import clean_kwargs, to_gdf, clean_time_cols try: from erclient.client import AsyncERClient @@ -87,7 +86,9 @@ async def get_sources_dataframe( async for source in self.get_sources(**addl_kwargs): sources.append(source) - return pd.DataFrame(sources) + df = pd.DataFrame(sources) + df = clean_time_cols(df) + return df async def get_subjects( self, @@ -196,6 +197,7 @@ async def get_subjects_dataframe( assert not df.empty df["hex"] = df["additional"].str["rgb"].map(to_hex) if "additional" in df else "#ff0000" + df = clean_time_cols(df) return df async def get_subjectsources(self, subjects=None, sources=None, **addl_kwargs): @@ -228,6 +230,7 @@ async def get_subjectsources_dataframe(self, subjects=None, sources=None, **addl subject_sources.append(subject_source) df = pd.DataFrame(subject_sources) + df = clean_time_cols(df) return df async def get_patrol_types_dataframe(self): @@ -281,6 +284,7 @@ async def get_patrols_dataframe(self, **kwargs): patrols.append(patrol) df = pd.DataFrame(patrols) + df = clean_time_cols(df) return df async def get_observations( @@ -353,18 +357,9 @@ async def get_observations_gdf(self, **kwargs): if observations.empty: return gpd.GeoDataFrame() - - observations["created_at"] = pd.to_datetime( - observations["created_at"], - errors="coerce", - utc=True, - ).dt.tz_convert(kwargs.get("tz")) - - observations["recorded_at"] = pd.to_datetime( - observations["recorded_at"], - errors="coerce", - utc=True, - ).dt.tz_convert(kwargs.get("tz")) + observations = clean_time_cols(observations) + observations["created_at"] = observations["created_at"].dt.tz_convert(kwargs.get("tz")) + observations["recorded_at"] = observations["recorded_at"].dt.tz_convert(kwargs.get("tz")) observations.sort_values("recorded_at", inplace=True) return to_gdf(observations) @@ -456,18 +451,10 @@ async def _get_observations_by_patrol(self, patrol, relocations=True, tz="UTC", continue observations_by_subject["subject_id"] = subject_id - observations_by_subject["created_at"] = pd.to_datetime( - observations_by_subject["created_at"], - errors="coerce", - utc=True, - ).dt.tz_convert(tz) - - observations_by_subject["recorded_at"] = pd.to_datetime( - observations_by_subject["recorded_at"], - errors="coerce", - utc=True, - ).dt.tz_convert(tz) - observations_by_subject.sort_values("recorded_at", inplace=True) + + observations_by_subject = clean_time_cols(observations_by_subject) + observations_by_subject["created_at"] = observations_by_subject["created_at"].dt.tz_convert(tz) + observations_by_subject["recorded_at"] = observations_by_subject["recorded_at"].dt.tz_convert(tz) observations_by_subject = to_gdf(observations_by_subject) if relocations: @@ -503,6 +490,7 @@ async def _get_observations_by_patrol(self, patrol, relocations=True, tz="UTC", ] ) ) + observations_by_subject = clean_time_cols(observations_by_subject) observations_by_subject.set_index("id", inplace=True) if len(observations_by_subject) > 0: @@ -617,7 +605,7 @@ async def get_events_dataframe( events.append(event) df = pd.DataFrame(events) - df["time"] = df["time"].apply(lambda x: pd.to_datetime(parser.parse(x))) + df = clean_time_cols(df) gdf = gpd.GeoDataFrame(df) if gdf.loc[0, "location"] is not None: gdf.loc[~gdf["geojson"].isna(), "geometry"] = gpd.GeoDataFrame.from_features( diff --git a/ecoscope/io/earthranger.py b/ecoscope/io/earthranger.py index 85c8694d..3a91d6a4 100644 --- a/ecoscope/io/earthranger.py +++ b/ecoscope/io/earthranger.py @@ -8,14 +8,13 @@ import pandas as pd import pytz import requests -from dateutil import parser from erclient.client import ERClient, ERClientException, ERClientNotFound from shapely.geometry import shape from tqdm.auto import tqdm import ecoscope from ecoscope.io.utils import pack_columns, to_hex -from ecoscope.io.earthranger_utils import clean_kwargs, dataframe_to_dict, to_gdf +from ecoscope.io.earthranger_utils import clean_kwargs, dataframe_to_dict, to_gdf, clean_time_cols class EarthRangerIO(ERClient): @@ -178,6 +177,7 @@ def partial_subjects(subjects): assert not df.empty df["hex"] = df["additional"].str["rgb"].map(to_hex) if "additional" in df else "#ff0000" + df = clean_time_cols(df) return df @@ -192,11 +192,13 @@ def get_subjectsources(self, subjects=None, sources=None, **addl_kwargs): subjectsources : pd.DataFrame """ params = clean_kwargs(addl_kwargs, sources=sources, subjects=subjects) - return pd.DataFrame( + df = pd.DataFrame( self.get_objects_multithreaded( object="subjectsources/", threads=self.tcp_limit, page_size=self.sub_page_size, **params ) ) + df = clean_time_cols(df) + return df def _get_observations( self, @@ -272,17 +274,9 @@ def _get_observations( if observations.empty: return gpd.GeoDataFrame() - observations["created_at"] = pd.to_datetime( - observations["created_at"], - errors="coerce", - utc=True, - ).dt.tz_convert(tz) - - observations["recorded_at"] = pd.to_datetime( - observations["recorded_at"], - errors="coerce", - utc=True, - ).dt.tz_convert(tz) + observations = clean_time_cols(observations) + observations["created_at"] = observations["created_at"].dt.tz_convert(tz) + observations["recorded_at"] = observations["recorded_at"].dt.tz_convert(tz) observations.sort_values("recorded_at", inplace=True) return to_gdf(observations) @@ -599,7 +593,7 @@ def get_events( assert not df.empty - df["time"] = df["time"].apply(lambda x: pd.to_datetime(parser.parse(x))) + df = clean_time_cols(df) gdf = gpd.GeoDataFrame(df) if gdf.loc[0, "location"] is not None: @@ -657,6 +651,7 @@ def get_patrols(self, since=None, until=None, patrol_type=None, status=None, **a ) if "serial_number" in df.columns: df = df.sort_values(by="serial_number").reset_index(drop=True) + df = clean_time_cols(df) return df def get_patrol_events(self, since=None, until=None, patrol_type=None, status=None, **addl_kwargs): @@ -687,6 +682,7 @@ def get_patrol_events(self, since=None, until=None, patrol_type=None, status=Non event["patrol_segment_id"] = segment.get("id") events.append(event) events_df = pd.DataFrame(events) + events_df = clean_time_cols(events_df) events_df["geometry"] = events_df["geojson"].apply(lambda x: shape(x.get("geometry"))) return gpd.GeoDataFrame(events_df, geometry="geometry", crs=4326) @@ -713,6 +709,7 @@ def get_patrol_segments_from_patrol_id(self, patrol_id, **addl_kwargs): df = self._get(object, **params) df["patrol_segments"][0].pop("updates") df.pop("updates") + df = clean_time_cols(df) return pd.DataFrame(dict([(k, pd.Series(v)) for k, v in df.items()])) @@ -816,7 +813,9 @@ def get_patrol_observations(self, patrols_df, include_patrol_details=False, **kw f"end_time={patrol_end_time} failed for: {e}" ) - df = ecoscope.base.Relocations(pd.concat(observations)) + df = pd.concat(observations) + df = clean_time_cols(df) + df = ecoscope.base.Relocations(df) if include_patrol_details: return df.set_index("id") return df @@ -840,11 +839,13 @@ def get_patrol_segment_events( ) object = f"activity/patrols/segments/{patrol_segment_id}/events/" - return pd.DataFrame( + df = pd.DataFrame( self.get_objects_multithreaded( object=object, threads=self.tcp_limit, page_size=self.sub_page_size, **params ) ) + df = clean_time_cols(df) + return df def get_spatial_features_group(self, spatial_features_group_id=None, **addl_kwargs): """ diff --git a/ecoscope/io/earthranger_utils.py b/ecoscope/io/earthranger_utils.py index 297981c3..5ab77c0d 100644 --- a/ecoscope/io/earthranger_utils.py +++ b/ecoscope/io/earthranger_utils.py @@ -1,5 +1,6 @@ import geopandas as gpd import pandas as pd +from dateutil import parser def clean_kwargs(addl_kwargs={}, **kwargs): @@ -33,3 +34,12 @@ def to_gdf(df): geometry=gpd.points_from_xy(df["location"].str[longitude], df["location"].str[latitude]), crs=4326, ) + + +def clean_time_cols(df): + time_cols = ["time", "created_at", "updated_at", "end_time", "last_position_date", "recorded_at"] + for col in time_cols: + if col in df.columns: + # convert x is not None to pd.isna(x) is False + df[col] = df[col].apply(lambda x: pd.to_datetime(parser.parse(x)) if not pd.isna(x) else None) + return df diff --git a/environment.yml b/environment.yml index 55699ca7..622c537a 100644 --- a/environment.yml +++ b/environment.yml @@ -19,3 +19,4 @@ dependencies: - sphinx-autoapi - dask[dataframe] - fsspec + - pytest-asyncio diff --git a/tests/conftest.py b/tests/conftest.py index acbb554f..0a80ab73 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -51,7 +51,7 @@ def er_io(): er_io = ecoscope.io.EarthRangerIO(server=ER_SERVER, username=ER_USERNAME, password=ER_PASSWORD) er_io.GROUP_NAME = "Elephants" - er_io.SUBJECT_IDS = er_io.get_subjects(group_name=er_io.GROUP_NAME).id.tolist() + er_io.SUBJECT_IDS = er_io.get_subjects(subject_group_name=er_io.GROUP_NAME).id.tolist() er_io.SUBJECTSOURCE_IDS, er_io.SOURCE_IDS = er_io.get_subjectsources(subjects=",".join(er_io.SUBJECT_IDS))[ ["id", "source"] ].values.T.tolist() @@ -69,7 +69,7 @@ def er_events_io(): ) er_events_io.GROUP_NAME = "Elephants" - er_events_io.SUBJECT_IDS = er_events_io.get_subjects(group_name=er_events_io.GROUP_NAME).id.tolist() + er_events_io.SUBJECT_IDS = er_events_io.get_subjects(subject_group_name=er_events_io.GROUP_NAME).id.tolist() er_events_io.SUBJECTSOURCE_IDS, er_events_io.SOURCE_IDS = er_events_io.get_subjectsources( subjects=",".join(er_events_io.SUBJECT_IDS) )[["id", "source"]].values.T.tolist()