Skip to content

Commit

Permalink
Parse date values returned from earthranger (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
atmorling authored Aug 7, 2024
1 parent 91de333 commit 6a96010
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 47 deletions.
44 changes: 16 additions & 28 deletions ecoscope/io/async_earthranger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
import geopandas as gpd
import pandas as pd
import asyncio
from dateutil import parser

import ecoscope
from ecoscope.io.utils import to_hex
from ecoscope.io.earthranger_utils import clean_kwargs, to_gdf
from ecoscope.io.earthranger_utils import clean_kwargs, to_gdf, clean_time_cols

try:
from erclient.client import AsyncERClient
Expand Down Expand Up @@ -87,7 +86,9 @@ async def get_sources_dataframe(
async for source in self.get_sources(**addl_kwargs):
sources.append(source)

return pd.DataFrame(sources)
df = pd.DataFrame(sources)
df = clean_time_cols(df)
return df

async def get_subjects(
self,
Expand Down Expand Up @@ -196,6 +197,7 @@ async def get_subjects_dataframe(
assert not df.empty
df["hex"] = df["additional"].str["rgb"].map(to_hex) if "additional" in df else "#ff0000"

df = clean_time_cols(df)
return df

async def get_subjectsources(self, subjects=None, sources=None, **addl_kwargs):
Expand Down Expand Up @@ -228,6 +230,7 @@ async def get_subjectsources_dataframe(self, subjects=None, sources=None, **addl
subject_sources.append(subject_source)

df = pd.DataFrame(subject_sources)
df = clean_time_cols(df)
return df

async def get_patrol_types_dataframe(self):
Expand Down Expand Up @@ -281,6 +284,7 @@ async def get_patrols_dataframe(self, **kwargs):
patrols.append(patrol)

df = pd.DataFrame(patrols)
df = clean_time_cols(df)
return df

async def get_observations(
Expand Down Expand Up @@ -353,18 +357,9 @@ async def get_observations_gdf(self, **kwargs):

if observations.empty:
return gpd.GeoDataFrame()

observations["created_at"] = pd.to_datetime(
observations["created_at"],
errors="coerce",
utc=True,
).dt.tz_convert(kwargs.get("tz"))

observations["recorded_at"] = pd.to_datetime(
observations["recorded_at"],
errors="coerce",
utc=True,
).dt.tz_convert(kwargs.get("tz"))
observations = clean_time_cols(observations)
observations["created_at"] = observations["created_at"].dt.tz_convert(kwargs.get("tz"))
observations["recorded_at"] = observations["recorded_at"].dt.tz_convert(kwargs.get("tz"))

observations.sort_values("recorded_at", inplace=True)
return to_gdf(observations)
Expand Down Expand Up @@ -456,18 +451,10 @@ async def _get_observations_by_patrol(self, patrol, relocations=True, tz="UTC",
continue

observations_by_subject["subject_id"] = subject_id
observations_by_subject["created_at"] = pd.to_datetime(
observations_by_subject["created_at"],
errors="coerce",
utc=True,
).dt.tz_convert(tz)

observations_by_subject["recorded_at"] = pd.to_datetime(
observations_by_subject["recorded_at"],
errors="coerce",
utc=True,
).dt.tz_convert(tz)
observations_by_subject.sort_values("recorded_at", inplace=True)

observations_by_subject = clean_time_cols(observations_by_subject)
observations_by_subject["created_at"] = observations_by_subject["created_at"].dt.tz_convert(tz)
observations_by_subject["recorded_at"] = observations_by_subject["recorded_at"].dt.tz_convert(tz)
observations_by_subject = to_gdf(observations_by_subject)

if relocations:
Expand Down Expand Up @@ -503,6 +490,7 @@ async def _get_observations_by_patrol(self, patrol, relocations=True, tz="UTC",
]
)
)
observations_by_subject = clean_time_cols(observations_by_subject)
observations_by_subject.set_index("id", inplace=True)

if len(observations_by_subject) > 0:
Expand Down Expand Up @@ -617,7 +605,7 @@ async def get_events_dataframe(
events.append(event)

df = pd.DataFrame(events)
df["time"] = df["time"].apply(lambda x: pd.to_datetime(parser.parse(x)))
df = clean_time_cols(df)
gdf = gpd.GeoDataFrame(df)
if gdf.loc[0, "location"] is not None:
gdf.loc[~gdf["geojson"].isna(), "geometry"] = gpd.GeoDataFrame.from_features(
Expand Down
35 changes: 18 additions & 17 deletions ecoscope/io/earthranger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
import pandas as pd
import pytz
import requests
from dateutil import parser
from erclient.client import ERClient, ERClientException, ERClientNotFound
from shapely.geometry import shape
from tqdm.auto import tqdm

import ecoscope
from ecoscope.io.utils import pack_columns, to_hex
from ecoscope.io.earthranger_utils import clean_kwargs, dataframe_to_dict, to_gdf
from ecoscope.io.earthranger_utils import clean_kwargs, dataframe_to_dict, to_gdf, clean_time_cols


class EarthRangerIO(ERClient):
Expand Down Expand Up @@ -178,6 +177,7 @@ def partial_subjects(subjects):
assert not df.empty

df["hex"] = df["additional"].str["rgb"].map(to_hex) if "additional" in df else "#ff0000"
df = clean_time_cols(df)

return df

Expand All @@ -192,11 +192,13 @@ def get_subjectsources(self, subjects=None, sources=None, **addl_kwargs):
subjectsources : pd.DataFrame
"""
params = clean_kwargs(addl_kwargs, sources=sources, subjects=subjects)
return pd.DataFrame(
df = pd.DataFrame(
self.get_objects_multithreaded(
object="subjectsources/", threads=self.tcp_limit, page_size=self.sub_page_size, **params
)
)
df = clean_time_cols(df)
return df

def _get_observations(
self,
Expand Down Expand Up @@ -272,17 +274,9 @@ def _get_observations(
if observations.empty:
return gpd.GeoDataFrame()

observations["created_at"] = pd.to_datetime(
observations["created_at"],
errors="coerce",
utc=True,
).dt.tz_convert(tz)

observations["recorded_at"] = pd.to_datetime(
observations["recorded_at"],
errors="coerce",
utc=True,
).dt.tz_convert(tz)
observations = clean_time_cols(observations)
observations["created_at"] = observations["created_at"].dt.tz_convert(tz)
observations["recorded_at"] = observations["recorded_at"].dt.tz_convert(tz)

observations.sort_values("recorded_at", inplace=True)
return to_gdf(observations)
Expand Down Expand Up @@ -599,7 +593,7 @@ def get_events(

assert not df.empty

df["time"] = df["time"].apply(lambda x: pd.to_datetime(parser.parse(x)))
df = clean_time_cols(df)

gdf = gpd.GeoDataFrame(df)
if gdf.loc[0, "location"] is not None:
Expand Down Expand Up @@ -657,6 +651,7 @@ def get_patrols(self, since=None, until=None, patrol_type=None, status=None, **a
)
if "serial_number" in df.columns:
df = df.sort_values(by="serial_number").reset_index(drop=True)
df = clean_time_cols(df)
return df

def get_patrol_events(self, since=None, until=None, patrol_type=None, status=None, **addl_kwargs):
Expand Down Expand Up @@ -687,6 +682,7 @@ def get_patrol_events(self, since=None, until=None, patrol_type=None, status=Non
event["patrol_segment_id"] = segment.get("id")
events.append(event)
events_df = pd.DataFrame(events)
events_df = clean_time_cols(events_df)

events_df["geometry"] = events_df["geojson"].apply(lambda x: shape(x.get("geometry")))
return gpd.GeoDataFrame(events_df, geometry="geometry", crs=4326)
Expand All @@ -713,6 +709,7 @@ def get_patrol_segments_from_patrol_id(self, patrol_id, **addl_kwargs):
df = self._get(object, **params)
df["patrol_segments"][0].pop("updates")
df.pop("updates")
df = clean_time_cols(df)

return pd.DataFrame(dict([(k, pd.Series(v)) for k, v in df.items()]))

Expand Down Expand Up @@ -816,7 +813,9 @@ def get_patrol_observations(self, patrols_df, include_patrol_details=False, **kw
f"end_time={patrol_end_time} failed for: {e}"
)

df = ecoscope.base.Relocations(pd.concat(observations))
df = pd.concat(observations)
df = clean_time_cols(df)
df = ecoscope.base.Relocations(df)
if include_patrol_details:
return df.set_index("id")
return df
Expand All @@ -840,11 +839,13 @@ def get_patrol_segment_events(
)

object = f"activity/patrols/segments/{patrol_segment_id}/events/"
return pd.DataFrame(
df = pd.DataFrame(
self.get_objects_multithreaded(
object=object, threads=self.tcp_limit, page_size=self.sub_page_size, **params
)
)
df = clean_time_cols(df)
return df

def get_spatial_features_group(self, spatial_features_group_id=None, **addl_kwargs):
"""
Expand Down
10 changes: 10 additions & 0 deletions ecoscope/io/earthranger_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import geopandas as gpd
import pandas as pd
from dateutil import parser


def clean_kwargs(addl_kwargs={}, **kwargs):
Expand Down Expand Up @@ -33,3 +34,12 @@ def to_gdf(df):
geometry=gpd.points_from_xy(df["location"].str[longitude], df["location"].str[latitude]),
crs=4326,
)


def clean_time_cols(df):
time_cols = ["time", "created_at", "updated_at", "end_time", "last_position_date", "recorded_at"]
for col in time_cols:
if col in df.columns:
# convert x is not None to pd.isna(x) is False
df[col] = df[col].apply(lambda x: pd.to_datetime(parser.parse(x)) if not pd.isna(x) else None)
return df
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ dependencies:
- sphinx-autoapi
- dask[dataframe]
- fsspec
- pytest-asyncio
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def er_io():
er_io = ecoscope.io.EarthRangerIO(server=ER_SERVER, username=ER_USERNAME, password=ER_PASSWORD)

er_io.GROUP_NAME = "Elephants"
er_io.SUBJECT_IDS = er_io.get_subjects(group_name=er_io.GROUP_NAME).id.tolist()
er_io.SUBJECT_IDS = er_io.get_subjects(subject_group_name=er_io.GROUP_NAME).id.tolist()
er_io.SUBJECTSOURCE_IDS, er_io.SOURCE_IDS = er_io.get_subjectsources(subjects=",".join(er_io.SUBJECT_IDS))[
["id", "source"]
].values.T.tolist()
Expand All @@ -69,7 +69,7 @@ def er_events_io():
)

er_events_io.GROUP_NAME = "Elephants"
er_events_io.SUBJECT_IDS = er_events_io.get_subjects(group_name=er_events_io.GROUP_NAME).id.tolist()
er_events_io.SUBJECT_IDS = er_events_io.get_subjects(subject_group_name=er_events_io.GROUP_NAME).id.tolist()
er_events_io.SUBJECTSOURCE_IDS, er_events_io.SOURCE_IDS = er_events_io.get_subjectsources(
subjects=",".join(er_events_io.SUBJECT_IDS)
)[["id", "source"]].values.T.tolist()
Expand Down

0 comments on commit 6a96010

Please sign in to comment.