Skip to content

Commit

Permalink
feat: Rename OnDemandTransformations to Transformations (feast-dev#4038)
Browse files Browse the repository at this point in the history
* feat: updating protos to separate transformation

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* fixed stuff...i think

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated tests and registry diff function

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated base registry

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated react component

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* formatted

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated stream feature view proto

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* making the proto changes backwards compatable

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* trying to make this backwards compatible

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* caught a bug and fixed the linter

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* actually linted

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated ui component

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* accidentally commented out fixtures

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* Updated

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* incrementing protos

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated tests

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* fixed linting issue and made backwards compatible

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* feat: Renaming OnDemandTransformations to Transformations

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated proto name

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* renamed substrait proto

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* renamed substrait proto

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated

* updated

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated integration test

* missed one

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

* updated to include Substrait type

* linter

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>

---------

Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>
  • Loading branch information
franciscojavierarceo authored Mar 25, 2024
1 parent c58ef74 commit 9b98eaf
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 97 deletions.
12 changes: 2 additions & 10 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ message OnDemandFeatureViewSpec {
// Map of sources for this feature view.
map<string, OnDemandSource> sources = 4;

oneof transformation {
UserDefinedFunction user_defined_function = 5 [deprecated = true];
OnDemandSubstraitTransformation on_demand_substrait_transformation = 9 [deprecated = true];
}
UserDefinedFunction user_defined_function = 5 [deprecated = true];

// Oneof with {user_defined_function, on_demand_substrait_transformation}
FeatureTransformationV2 feature_transformation = 10;

Expand Down Expand Up @@ -96,9 +94,3 @@ message UserDefinedFunction {
// The string representation of the udf
string body_text = 3;
}

message OnDemandSubstraitTransformation {
option deprecated = true;

bytes substrait_plan = 1;
}
5 changes: 2 additions & 3 deletions protos/feast/core/Transformation.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ message UserDefinedFunctionV2 {

// A feature transformation executed as a user-defined function
message FeatureTransformationV2 {
// Note this Transformation starts at 5 for backwards compatibility
oneof transformation {
UserDefinedFunctionV2 user_defined_function = 1;
OnDemandSubstraitTransformationV2 on_demand_substrait_transformation = 2;
SubstraitTransformationV2 substrait_transformation = 2;
}
}

message OnDemandSubstraitTransformationV2 {
message SubstraitTransformationV2 {
bytes substrait_plan = 1;
}
18 changes: 9 additions & 9 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import warnings
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, cast

Expand Down Expand Up @@ -145,22 +144,23 @@ def diff_registry_objects(
if _field.name in FIELDS_TO_IGNORE:
continue
elif getattr(current_spec, _field.name) != getattr(new_spec, _field.name):
# TODO: Delete "transformation" after we've safely deprecated it from the proto
if _field.name in ["transformation", "feature_transformation"]:
warnings.warn(
"transformation will be deprecated in the future please use feature_transformation instead.",
DeprecationWarning,
)
if _field.name == "feature_transformation":
current_spec = cast(OnDemandFeatureViewSpec, current_spec)
new_spec = cast(OnDemandFeatureViewSpec, new_spec)
# Check if the old proto is populated and use that if it is
deprecated_udf = current_spec.user_defined_function
feature_transformation_udf = (
current_spec.feature_transformation.user_defined_function
)
if (
current_spec.HasField("user_defined_function")
and not feature_transformation_udf
):
deprecated_udf = current_spec.user_defined_function
else:
deprecated_udf = None
current_udf = (
deprecated_udf
if deprecated_udf.body_text != ""
if deprecated_udf is not None
else feature_transformation_udf
)
new_udf = new_spec.feature_transformation.user_defined_function
Expand Down
33 changes: 29 additions & 4 deletions sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
from feast.request_feature_view import RequestFeatureView
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
from feast.transformation.pandas_transformation import PandasTransformation
from feast.transformation.substrait_transformation import SubstraitTransformation


class BaseRegistry(ABC):
Expand Down Expand Up @@ -670,10 +672,33 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
"We will be deprecating the usage of spec.userDefinedFunction in a future release please upgrade cautiously.",
DeprecationWarning,
)
odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][
"body"
] = on_demand_feature_view.feature_transformation.udf_string
registry_dict["onDemandFeatureViews"].append(odfv_dict)
if on_demand_feature_view.feature_transformation:
if isinstance(
on_demand_feature_view.feature_transformation, PandasTransformation
):
if "userDefinedFunction" not in odfv_dict["spec"]:
odfv_dict["spec"]["userDefinedFunction"] = {}
odfv_dict["spec"]["userDefinedFunction"][
"body"
] = on_demand_feature_view.feature_transformation.udf_string
odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][
"body"
] = on_demand_feature_view.feature_transformation.udf_string
elif isinstance(
on_demand_feature_view.feature_transformation,
SubstraitTransformation,
):
odfv_dict["spec"]["featureTransformation"]["substraitPlan"][
"body"
] = on_demand_feature_view.feature_transformation.substrait_plan
else:
odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][
"body"
] = None
odfv_dict["spec"]["featureTransformation"]["substraitPlan"][
"body"
] = None
registry_dict["onDemandFeatureViews"].append(odfv_dict)
for request_feature_view in sorted(
self.list_request_feature_views(project=project),
key=lambda request_feature_view: request_feature_view.name,
Expand Down
62 changes: 30 additions & 32 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field, from_value_type
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation
from feast.on_demand_substrait_transformation import OnDemandSubstraitTransformation
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
Expand All @@ -33,6 +31,8 @@
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProto,
)
from feast.transformation.pandas_transformation import PandasTransformation
from feast.transformation.substrait_transformation import SubstraitTransformation
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
Expand All @@ -57,7 +57,7 @@ class OnDemandFeatureView(BaseFeatureView):
sources with type FeatureViewProjection.
source_request_sources: A map from input source names to the actual input
sources with type RequestSource.
transformation: The user defined transformation.
feature_transformation: The user defined transformation.
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the on demand feature view, typically the email of the primary
Expand All @@ -68,8 +68,7 @@ class OnDemandFeatureView(BaseFeatureView):
features: List[Field]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
transformation: Union[OnDemandPandasTransformation]
feature_transformation: Union[OnDemandPandasTransformation]
feature_transformation: Union[PandasTransformation, SubstraitTransformation]
description: str
tags: Dict[str, str]
owner: str
Expand All @@ -89,8 +88,9 @@ def __init__( # noqa: C901
],
udf: Optional[FunctionType] = None,
udf_string: str = "",
transformation: Optional[Union[OnDemandPandasTransformation]] = None,
feature_transformation: Optional[Union[OnDemandPandasTransformation]] = None,
feature_transformation: Optional[
Union[PandasTransformation, SubstraitTransformation]
] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
Expand All @@ -108,7 +108,6 @@ def __init__( # noqa: C901
udf (deprecated): The user defined transformation function, which must take pandas
dataframes as inputs.
udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI)
transformation: The user defined transformation.
feature_transformation: The user defined transformation.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
Expand All @@ -123,13 +122,13 @@ def __init__( # noqa: C901
owner=owner,
)

if not transformation:
if not feature_transformation:
if udf:
warnings.warn(
"udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead.",
DeprecationWarning,
)
transformation = OnDemandPandasTransformation(udf, udf_string)
feature_transformation = PandasTransformation(udf, udf_string)
else:
raise Exception(
"OnDemandFeatureView needs to be initialized with either transformation or udf arguments"
Expand All @@ -147,8 +146,7 @@ def __init__( # noqa: C901
odfv_source.name
] = odfv_source.projection

self.transformation = transformation
self.feature_transformation = self.transformation
self.feature_transformation = feature_transformation

@property
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
Expand All @@ -160,8 +158,7 @@ def __copy__(self):
schema=self.features,
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
transformation=self.transformation,
feature_transformation=self.transformation,
feature_transformation=self.feature_transformation,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand All @@ -182,7 +179,6 @@ def __eq__(self, other):
self.source_feature_view_projections
!= other.source_feature_view_projections
or self.source_request_sources != other.source_request_sources
or self.transformation != other.transformation
or self.feature_transformation != other.feature_transformation
):
return False
Expand Down Expand Up @@ -218,12 +214,12 @@ def to_proto(self) -> OnDemandFeatureViewProto:
)

feature_transformation = FeatureTransformationProto(
user_defined_function=self.transformation.to_proto()
if type(self.transformation) == OnDemandPandasTransformation
user_defined_function=self.feature_transformation.to_proto()
if isinstance(self.feature_transformation, PandasTransformation)
else None,
substrait_transformation=self.feature_transformation.to_proto()
if isinstance(self.feature_transformation, SubstraitTransformation)
else None,
on_demand_substrait_transformation=self.transformation.to_proto()
if type(self.transformation) == OnDemandSubstraitTransformation
else None, # type: ignore
)
spec = OnDemandFeatureViewSpec(
name=self.name,
Expand Down Expand Up @@ -276,17 +272,17 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text
!= ""
):
transformation = OnDemandPandasTransformation.from_proto(
transformation = PandasTransformation.from_proto(
on_demand_feature_view_proto.spec.feature_transformation.user_defined_function
)
elif (
on_demand_feature_view_proto.spec.feature_transformation.WhichOneof(
"transformation"
)
== "on_demand_substrait_transformation"
== "substrait_transformation"
):
transformation = OnDemandSubstraitTransformation.from_proto(
on_demand_feature_view_proto.spec.feature_transformation.on_demand_substrait_transformation
transformation = SubstraitTransformation.from_proto(
on_demand_feature_view_proto.spec.feature_transformation.substrait_transformation
)
elif (
hasattr(on_demand_feature_view_proto.spec, "user_defined_function")
Expand All @@ -298,7 +294,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
body=on_demand_feature_view_proto.spec.user_defined_function.body,
body_text=on_demand_feature_view_proto.spec.user_defined_function.body_text,
)
transformation = OnDemandPandasTransformation.from_proto(
transformation = PandasTransformation.from_proto(
user_defined_function_proto=backwards_compatible_udf,
)
else:
Expand All @@ -314,7 +310,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
for feature in on_demand_feature_view_proto.spec.features
],
sources=sources,
transformation=transformation,
feature_transformation=transformation,
description=on_demand_feature_view_proto.spec.description,
tags=dict(on_demand_feature_view_proto.spec.tags),
owner=on_demand_feature_view_proto.spec.owner,
Expand Down Expand Up @@ -374,7 +370,9 @@ def get_transformed_features_df(

# Compute transformed values and apply to each result row

df_with_transformed_features = self.transformation.transform(df_with_features)
df_with_transformed_features = self.feature_transformation.transform(
df_with_features
)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
Expand Down Expand Up @@ -424,7 +422,7 @@ def infer_features(self) -> None:
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None
df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype)
output_df: pd.DataFrame = self.transformation.transform(df)
output_df: pd.DataFrame = self.feature_transformation.transform(df)
inferred_features = []
for f, dt in zip(output_df.columns, output_df.dtypes):
inferred_features.append(
Expand Down Expand Up @@ -521,7 +519,7 @@ def decorator(user_function):
input_fields: Field = []

for s in sources:
if type(s) == FeatureView:
if isinstance(s, FeatureView):
fields = s.projection.features
else:
fields = s.features
Expand All @@ -540,19 +538,19 @@ def decorator(user_function):

expr = user_function(ibis.table(input_fields, "t"))

transformation = OnDemandSubstraitTransformation(
transformation = SubstraitTransformation(
substrait_plan=compiler.compile(expr).SerializeToString()
)
else:
udf_string = dill.source.getsource(user_function)
mainify(user_function)
transformation = OnDemandPandasTransformation(user_function, udf_string)
transformation = PandasTransformation(user_function, udf_string)

on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
sources=sources,
schema=schema,
transformation=transformation,
feature_transformation=transformation,
description=description,
tags=tags,
owner=owner,
Expand Down
10 changes: 4 additions & 6 deletions sdk/python/feast/stream_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.field import Field
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
Expand All @@ -32,6 +31,7 @@
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProtoV2,
)
from feast.transformation.pandas_transformation import PandasTransformation

warnings.simplefilter("once", RuntimeWarning)

Expand Down Expand Up @@ -80,7 +80,7 @@ class StreamFeatureView(FeatureView):
materialization_intervals: List[Tuple[datetime, datetime]]
udf: Optional[FunctionType]
udf_string: Optional[str]
feature_transformation: Optional[OnDemandPandasTransformation]
feature_transformation: Optional[PandasTransformation]

def __init__(
self,
Expand All @@ -99,7 +99,7 @@ def __init__(
timestamp_field: Optional[str] = "",
udf: Optional[FunctionType] = None,
udf_string: Optional[str] = "",
feature_transformation: Optional[Union[OnDemandPandasTransformation]] = None,
feature_transformation: Optional[Union[PandasTransformation]] = None,
):
if not flags_helper.is_test():
warnings.warn(
Expand Down Expand Up @@ -371,9 +371,7 @@ def decorator(user_function):
schema=schema,
udf=user_function,
udf_string=udf_string,
feature_transformation=OnDemandPandasTransformation(
user_function, udf_string
),
feature_transformation=PandasTransformation(user_function, udf_string),
description=description,
tags=tags,
online=online,
Expand Down
Empty file.
Loading

0 comments on commit 9b98eaf

Please sign in to comment.