Skip to content

Commit

Permalink
Refactor _convert_arrow_to_proto (feast-dev#2085)
Browse files Browse the repository at this point in the history
* Refactor `_convert_arrow_to_proto`

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Add `benchmark-python-local` to Makefile

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
  • Loading branch information
judahrand authored Nov 24, 2021
1 parent 352dd6c commit 0356181
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 43 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ install-python:
benchmark-python:
FEAST_USAGE=False IS_TEST=True python -m pytest --integration --benchmark --benchmark-autosave --benchmark-save-data sdk/python/tests

benchmark-python-local:
FEAST_USAGE=False IS_TEST=True FEAST_IS_LOCAL_TEST=True python -m pytest --integration --benchmark --benchmark-autosave --benchmark-save-data sdk/python/tests

test-python:
FEAST_USAGE=False IS_TEST=True python -m pytest -n 8 sdk/python/tests

Expand Down
96 changes: 53 additions & 43 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,56 +284,66 @@ def _run_field_mapping(
return table


def _coerce_datetime(ts):
"""
Depending on underlying time resolution, arrow to_pydict() sometimes returns pandas
timestamp type (for nanosecond resolution), and sometimes you get standard python datetime
(for microsecond resolution).
While pandas timestamp class is a subclass of python datetime, it doesn't always behave the
same way. We convert it to normal datetime so that consumers downstream don't have to deal
with these quirks.
"""

if isinstance(ts, pandas.Timestamp):
return ts.to_pydatetime()
else:
return ts


def _convert_arrow_to_proto(
table: Union[pyarrow.Table, pyarrow.RecordBatch],
feature_view: FeatureView,
join_keys: List[str],
) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]:
rows_to_write = []

def _coerce_datetime(ts):
"""
Depending on underlying time resolution, arrow to_pydict() sometimes returns pandas
timestamp type (for nanosecond resolution), and sometimes you get standard python datetime
(for microsecond resolution).
While pandas timestamp class is a subclass of python datetime, it doesn't always behave the
same way. We convert it to normal datetime so that consumers downstream don't have to deal
with these quirks.
"""
# Handle join keys
join_key_values = {k: table.column(k).to_pylist() for k in join_keys}
entity_keys = [
EntityKeyProto(
join_keys=join_keys,
entity_values=[
python_value_to_proto_value(join_key_values[k][idx]) for k in join_keys
],
)
for idx in range(table.num_rows)
]

if isinstance(ts, pandas.Timestamp):
return ts.to_pydatetime()
else:
return ts

column_names_idx = {field.name: i for i, field in enumerate(table.schema)}
for row in zip(*table.to_pydict().values()):
entity_key = EntityKeyProto()
for join_key in join_keys:
entity_key.join_keys.append(join_key)
idx = column_names_idx[join_key]
value = python_value_to_proto_value(row[idx])
entity_key.entity_values.append(value)
feature_dict = {}
for feature in feature_view.features:
idx = column_names_idx[feature.name]
value = python_value_to_proto_value(row[idx], feature.dtype)
feature_dict[feature.name] = value
event_timestamp_idx = column_names_idx[
feature_view.batch_source.event_timestamp_column
# Serialize the features per row
feature_dict = {
feature.name: [
python_value_to_proto_value(val, feature.dtype)
for val in table.column(feature.name).to_pylist()
]
event_timestamp = _coerce_datetime(row[event_timestamp_idx])
for feature in feature_view.features
}
features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())]

# Convert event_timestamps
event_timestamps = [
_coerce_datetime(val)
for val in table.column(
feature_view.batch_source.event_timestamp_column
).to_pylist()
]

if feature_view.batch_source.created_timestamp_column:
created_timestamp_idx = column_names_idx[
# Convert created_timestamps if they exist
if feature_view.batch_source.created_timestamp_column:
created_timestamps = [
_coerce_datetime(val)
for val in table.column(
feature_view.batch_source.created_timestamp_column
]
created_timestamp = _coerce_datetime(row[created_timestamp_idx])
else:
created_timestamp = None
).to_pylist()
]
else:
created_timestamps = [None] * table.num_rows

rows_to_write.append(
(entity_key, feature_dict, event_timestamp, created_timestamp)
)
return rows_to_write
return list(zip(entity_keys, features, event_timestamps, created_timestamps))

0 comments on commit 0356181

Please sign in to comment.