Skip to content

Commit

Permalink
fix: Fix feature service inference logic (feast-dev#3089)
Browse files Browse the repository at this point in the history
* Add __init__.py files to allow test files to share names

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Add feature service tests

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Fix feature service inference logic

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Remove stray `__init__.py` file

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Fix comments

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Add check

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Temp

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Address comments

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Fix

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Address comments

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Fix

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 authored Aug 19, 2022
1 parent 7e887e4 commit 4310ed7
Show file tree
Hide file tree
Showing 22 changed files with 342 additions and 59 deletions.
12 changes: 9 additions & 3 deletions sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,16 @@ def __getitem__(self, item):

cp = self.__copy__()
if self.features:
feature_name_to_feature = {
feature.name: feature for feature in self.features
}
referenced_features = []
for feature in self.features:
if feature.name in item:
referenced_features.append(feature)
for feature in item:
if feature not in feature_name_to_feature:
raise ValueError(
f"Feature {feature} does not exist in this feature view."
)
referenced_features.append(feature_name_to_feature[feature])
cp.projection.features = referenced_features
else:
cp.projection.desired_features = item
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ def __init__(self, expected_column_name: str):
)


class FeatureViewMissingDuringFeatureServiceInference(Exception):
def __init__(self, feature_view_name: str, feature_service_name: str):
super().__init__(
f"Missing {feature_view_name} feature view during inference for {feature_service_name} feature service."
)


class InvalidEntityType(Exception):
def __init__(self, entity_type: type):
super().__init__(
Expand Down
65 changes: 51 additions & 14 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typeguard import typechecked

from feast.base_feature_view import BaseFeatureView
from feast.errors import FeatureViewMissingDuringFeatureServiceInference
from feast.feature_logging import LoggingConfig
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
Expand Down Expand Up @@ -85,32 +86,68 @@ def __init__(
if isinstance(feature_grouping, BaseFeatureView):
self.feature_view_projections.append(feature_grouping.projection)

def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None):
def infer_features(self, fvs_to_update: Dict[str, FeatureView]):
"""
Infers the features for the projections of this feature service, and updates this feature
service in place.
This method is necessary since feature services may rely on feature views which require
feature inference.
Args:
fvs_to_update: A mapping of feature view names to corresponding feature views that
contains all the feature views necessary to run inference.
"""
for feature_grouping in self._features:
if isinstance(feature_grouping, BaseFeatureView):
# For feature services that depend on an unspecified feature view, apply inferred schema
if fvs_to_update and feature_grouping.name in fvs_to_update:
if feature_grouping.projection.desired_features:
desired_features = set(
feature_grouping.projection.desired_features
)
projection = feature_grouping.projection

if projection.desired_features:
# The projection wants to select a specific set of inferred features.
# Example: FeatureService(features=[fv[["inferred_feature"]]]), where
# 'fv' is a feature view that was defined without a schema.
if feature_grouping.name in fvs_to_update:
# First we validate that the selected features have actually been inferred.
desired_features = set(projection.desired_features)
actual_features = set(
[
f.name
for f in fvs_to_update[feature_grouping.name].features
]
)
assert desired_features.issubset(actual_features)
# We need to set the features for the projection at this point so we ensure we're starting with
# an empty list.
feature_grouping.projection.features = []

# Then we extract the selected features and add them to the projection.
projection.features = []
for f in fvs_to_update[feature_grouping.name].features:
if f.name in desired_features:
feature_grouping.projection.features.append(f)
projection.features.append(f)
else:
feature_grouping.projection.features = fvs_to_update[
feature_grouping.name
].features
raise FeatureViewMissingDuringFeatureServiceInference(
feature_view_name=feature_grouping.name,
feature_service_name=self.name,
)

continue

if projection.features:
# The projection has already selected features from a feature view with a
# known schema, so no action needs to be taken.
# Example: FeatureService(features=[fv[["existing_feature"]]]), where
# 'existing_feature' was defined as part of the schema of 'fv'.
# Example: FeatureService(features=[fv]), where 'fv' was defined with a schema.
continue

# The projection wants to select all possible inferred features.
# Example: FeatureService(features=[fv]), where 'fv' is a feature view that
# was defined without a schema.
if feature_grouping.name in fvs_to_update:
projection.features = fvs_to_update[feature_grouping.name].features
else:
raise FeatureViewMissingDuringFeatureServiceInference(
feature_view_name=feature_grouping.name,
feature_service_name=self.name,
)
else:
raise ValueError(
f"The feature service {self.name} has been provided with an invalid type "
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class FeatureViewProjection:
name: The unique name of the feature view from which this projection is created.
name_alias: An optional alias for the name.
features: The list of features represented by the feature view projection.
desired_features: The list of features that this feature view projection intends to select.
If empty, the projection intends to select all features. This attribute is only used
for feature service inference. It should only be set if the underlying feature view
is not ready to be projected, i.e. still needs to go through feature inference.
join_key_map: A map to modify join key columns during retrieval of this feature
view projection.
"""
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from datetime import timedelta

from feast import Entity, FeatureService, FeatureView, Field, FileSource
from feast.types import Float32, Int32, Int64

driver_hourly_stats = FileSource(
path="data/driver_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver = Entity(
name="driver_id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_id", dtype=Int32),
],
online=True,
source=driver_hourly_stats,
tags={},
)

global_daily_stats = FileSource(
path="data/global_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

global_stats_feature_view = FeatureView(
name="global_daily_stats",
entities=[],
ttl=timedelta(days=1),
schema=[
Field(name="num_rides", dtype=Int32),
Field(name="avg_ride_length", dtype=Float32),
],
online=True,
source=global_daily_stats,
tags={},
)

all_stats_service = FeatureService(
name="all_stats",
features=[driver_hourly_stats_view, global_stats_feature_view],
tags={"release": "production"},
)

some_stats_service = FeatureService(
name="some_stats",
features=[
driver_hourly_stats_view[["conv_rate"]],
global_stats_feature_view[["num_rides"]],
],
tags={"release": "production"},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import timedelta

from feast import Entity, FeatureService, FeatureView, FileSource

driver_hourly_stats = FileSource(
path="%PARQUET_PATH%", # placeholder to be replaced by the test
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver = Entity(
name="driver_id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
online=True,
source=driver_hourly_stats,
tags={},
)

global_daily_stats = FileSource(
path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

global_stats_feature_view = FeatureView(
name="global_daily_stats",
entities=[],
ttl=timedelta(days=1),
online=True,
source=global_daily_stats,
tags={},
)

all_stats_service = FeatureService(
name="all_stats",
features=[driver_hourly_stats_view, global_stats_feature_view],
tags={"release": "production"},
)

some_stats_service = FeatureService(
name="some_stats",
features=[
driver_hourly_stats_view[["conv_rate"]],
global_stats_feature_view[["num_rides"]],
],
tags={"release": "production"},
)
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
60 changes: 60 additions & 0 deletions sdk/python/tests/unit/infra/test_inference_unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ def test_feature_view_inference_on_feature_columns(simple_dataset_1):


def test_update_feature_services_with_inferred_features(simple_dataset_1):
"""
Tests that a feature service that references feature views without specified features will
be updated with the correct projections after feature inference.
"""
with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source:
entity1 = Entity(name="test1", join_keys=["id_join_key"])
feature_view_1 = FeatureView(
Expand Down Expand Up @@ -338,4 +342,60 @@ def test_update_feature_services_with_inferred_features(simple_dataset_1):
assert len(feature_service.feature_view_projections[1].features) == 3


def test_update_feature_services_with_specified_features(simple_dataset_1):
"""
Tests that a feature service that references feature views with specified features will
have the correct projections both before and after feature inference.
"""
with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source:
entity1 = Entity(name="test1", join_keys=["id_join_key"])
feature_view_1 = FeatureView(
name="test1",
entities=[entity1],
schema=[
Field(name="float_col", dtype=Float32),
Field(name="id_join_key", dtype=Int64),
],
source=file_source,
)
feature_view_2 = FeatureView(
name="test2",
entities=[entity1],
schema=[
Field(name="int64_col", dtype=Int64),
Field(name="id_join_key", dtype=Int64),
],
source=file_source,
)

feature_service = FeatureService(
name="fs_1", features=[feature_view_1[["float_col"]], feature_view_2]
)
assert len(feature_service.feature_view_projections) == 2
assert len(feature_service.feature_view_projections[0].features) == 1
assert len(feature_service.feature_view_projections[0].desired_features) == 0
assert len(feature_service.feature_view_projections[1].features) == 1
assert len(feature_service.feature_view_projections[1].desired_features) == 0

update_feature_views_with_inferred_features_and_entities(
[feature_view_1, feature_view_2],
[entity1],
RepoConfig(
provider="local", project="test", entity_key_serialization_version=2
),
)
assert len(feature_view_1.features) == 1
assert len(feature_view_2.features) == 1

feature_service.infer_features(
fvs_to_update={
feature_view_1.name: feature_view_1,
feature_view_2.name: feature_view_2,
}
)

assert len(feature_service.feature_view_projections[0].features) == 1
assert len(feature_service.feature_view_projections[1].features) == 1


# TODO(felixwang9817): Add tests that interact with field mapping.
Empty file.
Loading

0 comments on commit 4310ed7

Please sign in to comment.