Skip to content

Commit

Permalink
fix: Update on demand feature view api (#2587)
Browse files Browse the repository at this point in the history
* Migrate over

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* temp fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix import

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix integration tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix integration tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* lilnt

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint?

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba authored Apr 22, 2022
1 parent 4606d7c commit 38cd7f9
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 77 deletions.
10 changes: 5 additions & 5 deletions docs/tutorials/validating-historical-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pyarrow.parquet.write_table(entities_2019_table, "entities.parquet")
import pyarrow.parquet
import pandas as pd

from feast import FeatureView, Entity, FeatureStore, Field
from feast import FeatureView, Entity, FeatureStore, Field, BatchFeatureView
from feast.types import Float64, Int64
from feast.value_type import ValueType
from feast.data_format import ParquetFormat
Expand All @@ -134,7 +134,7 @@ taxi_entity = Entity(name='taxi', join_keys=['taxi_id'])


```python
trips_stats_fv = FeatureView(
trips_stats_fv = BatchFeatureView(
name='trip_stats',
entities=['taxi'],
features=[
Expand All @@ -160,9 +160,9 @@ trips_stats_fv = FeatureView(
Field("avg_trip_seconds", Float64),
Field("earned_per_hour", Float64),
],
sources={
"stats": trips_stats_fv
}
sources=[
trips_stats_fv,
]
)
def on_demand_stats(inp):
out = pd.DataFrame()
Expand Down
12 changes: 6 additions & 6 deletions examples/java-demo/feature_repo/driver_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
from google.protobuf.duration_pb2 import Duration
from feast.field import Field

from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast import Entity, Feature, BatchFeatureView, FileSource, ValueType

driver_hourly_stats = FileSource(
path="data/driver_stats_with_string.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)
driver_hourly_stats_view = FeatureView(
driver_hourly_stats_view = BatchFeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400000),
Expand Down Expand Up @@ -43,10 +43,10 @@
# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
inputs={
"driver_hourly_stats": driver_hourly_stats_view,
"vals_to_add": input_request,
},
inputs=[
driver_hourly_stats_view,
input_request,
],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from feast.infra.offline_stores.redshift_source import RedshiftSource
from feast.infra.offline_stores.snowflake_source import SnowflakeSource

from .batch_feature_view import BatchFeatureView
from .data_source import (
KafkaSource,
KinesisSource,
Expand All @@ -23,6 +24,7 @@
from .on_demand_feature_view import OnDemandFeatureView
from .repo_config import RepoConfig
from .request_feature_view import RequestFeatureView
from .stream_feature_view import StreamFeatureView
from .value_type import ValueType

logging.basicConfig(
Expand All @@ -38,6 +40,7 @@
pass

__all__ = [
"BatchFeatureView",
"Entity",
"KafkaSource",
"KinesisSource",
Expand All @@ -49,6 +52,7 @@
"OnDemandFeatureView",
"RepoConfig",
"SourceType",
"StreamFeatureView",
"ValueType",
"BigQuerySource",
"FileSource",
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
else feature_view_proto.spec.ttl.ToTimedelta()
),
source=batch_source,
stream_source=stream_source,
)
if stream_source:
feature_view.stream_source = stream_source

# FeatureViewProjections are not saved in the FeatureView proto.
# Create the default projection.
Expand Down
152 changes: 111 additions & 41 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd

from feast.base_feature_view import BaseFeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import RequestSource
from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError
from feast.feature import Feature
Expand All @@ -25,6 +26,7 @@
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)
from feast.stream_feature_view import StreamFeatureView
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
Expand Down Expand Up @@ -66,14 +68,21 @@ class OnDemandFeatureView(BaseFeatureView):
tags: Dict[str, str]
owner: str

@log_exceptions
def __init__(
@log_exceptions # noqa: C901
def __init__( # noqa: C901
self,
*args,
name: Optional[str] = None,
features: Optional[List[Feature]] = None,
sources: Optional[
Dict[str, Union[FeatureView, FeatureViewProjection, RequestSource]]
List[
Union[
BatchFeatureView,
StreamFeatureView,
RequestSource,
FeatureViewProjection,
]
]
] = None,
udf: Optional[MethodType] = None,
inputs: Optional[
Expand All @@ -92,11 +101,11 @@ def __init__(
features (deprecated): The list of features in the output of the on demand
feature view, after the transformation has been applied.
sources (optional): A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
which may be feature views, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
udf (optional): The user defined transformation function, which must take pandas
dataframes as inputs.
inputs (optional): A map from input source names to the actual input sources,
inputs (optional): (Deprecated) A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
schema (optional): The list of features in the output of the on demand feature
Expand All @@ -123,8 +132,7 @@ def __init__(
),
DeprecationWarning,
)

_sources = sources or inputs
_sources = sources or []
if inputs and sources:
raise ValueError("At most one of `sources` or `inputs` can be specified.")
elif inputs:
Expand All @@ -135,7 +143,17 @@ def __init__(
),
DeprecationWarning,
)

for _, source in inputs.items():
if isinstance(source, FeatureView):
_sources.append(feature_view_to_batch_feature_view(source))
elif isinstance(source, RequestSource) or isinstance(
source, FeatureViewProjection
):
_sources.append(source)
else:
raise ValueError(
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
)
_udf = udf

if args:
Expand Down Expand Up @@ -169,7 +187,18 @@ def __init__(
DeprecationWarning,
)
if len(args) >= 3:
_sources = args[2]
_inputs = args[2]
for _, source in _inputs.items():
if isinstance(source, FeatureView):
_sources.append(feature_view_to_batch_feature_view(source))
elif isinstance(source, RequestSource) or isinstance(
source, FeatureViewProjection
):
_sources.append(source)
else:
raise ValueError(
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
)
warnings.warn(
(
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
Expand All @@ -195,18 +224,17 @@ def __init__(
tags=tags,
owner=owner,
)

assert _sources is not None
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
self.source_request_sources: Dict[str, RequestSource] = {}
for source_name, odfv_source in _sources.items():
for odfv_source in _sources:
if isinstance(odfv_source, RequestSource):
self.source_request_sources[source_name] = odfv_source
self.source_request_sources[odfv_source.name] = odfv_source
elif isinstance(odfv_source, FeatureViewProjection):
self.source_feature_view_projections[source_name] = odfv_source
self.source_feature_view_projections[odfv_source.name] = odfv_source
else:
self.source_feature_view_projections[
source_name
odfv_source.name
] = odfv_source.projection

if _udf is None:
Expand All @@ -219,12 +247,12 @@ def proto_class(self) -> Type[OnDemandFeatureViewProto]:
return OnDemandFeatureViewProto

def __copy__(self):

fv = OnDemandFeatureView(
name=self.name,
schema=self.features,
sources=dict(
**self.source_feature_view_projections, **self.source_request_sources,
),
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
udf=self.udf,
description=self.description,
tags=self.tags,
Expand Down Expand Up @@ -302,22 +330,21 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
Returns:
A OnDemandFeatureView object based on the on-demand feature view protobuf.
"""
sources = {}
for (
source_name,
on_demand_source,
) in on_demand_feature_view_proto.spec.sources.items():
sources = []
for (_, on_demand_source,) in on_demand_feature_view_proto.spec.sources.items():
if on_demand_source.WhichOneof("source") == "feature_view":
sources[source_name] = FeatureView.from_proto(
on_demand_source.feature_view
).projection
sources.append(
FeatureView.from_proto(on_demand_source.feature_view).projection
)
elif on_demand_source.WhichOneof("source") == "feature_view_projection":
sources[source_name] = FeatureViewProjection.from_proto(
on_demand_source.feature_view_projection
sources.append(
FeatureViewProjection.from_proto(
on_demand_source.feature_view_projection
)
)
else:
sources[source_name] = RequestSource.from_proto(
on_demand_source.request_data_source
sources.append(
RequestSource.from_proto(on_demand_source.request_data_source)
)
on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
Expand Down Expand Up @@ -476,7 +503,16 @@ def get_requested_odfvs(feature_refs, project, registry):
def on_demand_feature_view(
*args,
features: Optional[List[Feature]] = None,
sources: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None,
sources: Optional[
List[
Union[
BatchFeatureView,
StreamFeatureView,
RequestSource,
FeatureViewProjection,
]
]
] = None,
inputs: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None,
schema: Optional[List[Field]] = None,
description: str = "",
Expand All @@ -490,7 +526,7 @@ def on_demand_feature_view(
features (deprecated): The list of features in the output of the on demand
feature view, after the transformation has been applied.
sources (optional): A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
which may be feature views, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
inputs (optional): A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
Expand All @@ -517,8 +553,7 @@ def on_demand_feature_view(
),
DeprecationWarning,
)

_sources = sources or inputs
_sources = sources or []
if inputs and sources:
raise ValueError("At most one of `sources` or `inputs` can be specified.")
elif inputs:
Expand All @@ -529,6 +564,17 @@ def on_demand_feature_view(
),
DeprecationWarning,
)
for _, source in inputs.items():
if isinstance(source, FeatureView):
_sources.append(feature_view_to_batch_feature_view(source))
elif isinstance(source, RequestSource) or isinstance(
source, FeatureViewProjection
):
_sources.append(source)
else:
raise ValueError(
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
)

if args:
warnings.warn(
Expand Down Expand Up @@ -559,14 +605,25 @@ def on_demand_feature_view(
DeprecationWarning,
)
if len(args) >= 2:
_sources = args[1]
warnings.warn(
(
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
"Feast 0.21 and onwards will not support the `inputs` parameter."
),
DeprecationWarning,
)
_inputs = args[1]
for _, source in _inputs.items():
if isinstance(source, FeatureView):
_sources.append(feature_view_to_batch_feature_view(source))
elif isinstance(source, RequestSource) or isinstance(
source, FeatureViewProjection
):
_sources.append(source)
else:
raise ValueError(
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
)
warnings.warn(
(
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
"Feast 0.21 and onwards will not support the `inputs` parameter."
),
DeprecationWarning,
)

if not _sources:
raise ValueError("The `sources` parameter must be specified.")
Expand All @@ -587,3 +644,16 @@ def decorator(user_function):
return on_demand_feature_view_obj

return decorator


def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView:
return BatchFeatureView(
name=fv.name,
entities=fv.entities,
ttl=fv.ttl,
tags=fv.tags,
online=fv.online,
owner=fv.owner,
schema=fv.schema,
source=fv.source,
)
Loading

0 comments on commit 38cd7f9

Please sign in to comment.