Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Decouple transformation types from ODFVs #3949

Merged
merged 5 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ message OnDemandFeatureViewSpec {
// Map of sources for this feature view.
map<string, OnDemandSource> sources = 4;

UserDefinedFunction user_defined_function = 5;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't be backwards compatible right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure it is. The binary format will be the same and there would have been no way for multiple fields to have been set with previous version, as it's just a single field. Googled a bit just now.. the very last sentence in this blog post seems to also indicate this should be fine.

oneof transformation {
UserDefinedFunction user_defined_function = 5;
}

// Description of the on demand feature view.
string description = 6;
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:

odfv_dict["spec"]["userDefinedFunction"][
"body"
] = on_demand_feature_view.udf_string
] = on_demand_feature_view.transformation.udf_string
registry_dict["onDemandFeatureViews"].append(odfv_dict)
for request_feature_view in sorted(
self.list_request_feature_views(project=project),
Expand Down
69 changes: 34 additions & 35 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import functools
import warnings
from datetime import datetime
from types import FunctionType
from typing import Any, Dict, List, Optional, Type, Union

import dill
Expand All @@ -16,6 +15,7 @@
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field, from_value_type
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
Expand All @@ -24,9 +24,6 @@
OnDemandFeatureViewSpec,
OnDemandSource,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
Expand All @@ -51,8 +48,7 @@ class OnDemandFeatureView(BaseFeatureView):
sources with type FeatureViewProjection.
source_request_sources: A map from input source names to the actual input
sources with type RequestSource.
udf: The user defined transformation function, which must take pandas dataframes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we still keep the udf and add deprecated to make it still backward compatible? I like the new field transformation but just in case some teams still use udf in the code base

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's a good idea

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. nit, maybe keep the udf in the comment as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in the comment for the init method. Do you mean the comment that lists the attributes? it's no longer an attribute, so probably shouldn't be there. It's being converted to transformation in init method and discarded.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah that make sense

as inputs.
transformation: The user defined transformation.
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the on demand feature view, typically the email of the primary
Expand All @@ -63,8 +59,7 @@ class OnDemandFeatureView(BaseFeatureView):
features: List[Field]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
udf: FunctionType
udf_string: str
transformation: Union[OnDemandPandasTransformation]
description: str
tags: Dict[str, str]
owner: str
Expand All @@ -82,8 +77,7 @@ def __init__( # noqa: C901
FeatureViewProjection,
]
],
udf: FunctionType,
udf_string: str = "",
transformation: Union[OnDemandPandasTransformation],
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
Expand All @@ -98,9 +92,7 @@ def __init__( # noqa: C901
sources: A map from input source names to the actual input sources, which may be
feature views, or request data sources. These sources serve as inputs to the udf,
which will refer to them by name.
udf: The user defined transformation function, which must take pandas
dataframes as inputs.
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
transformation: The user defined transformation.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the on demand feature view, typically the email
Expand All @@ -126,8 +118,7 @@ def __init__( # noqa: C901
odfv_source.name
] = odfv_source.projection

self.udf = udf # type: ignore
self.udf_string = udf_string
self.transformation = transformation

@property
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
Expand All @@ -139,8 +130,7 @@ def __copy__(self):
schema=self.features,
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
udf=self.udf,
udf_string=self.udf_string,
transformation=self.transformation,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand All @@ -161,8 +151,7 @@ def __eq__(self, other):
self.source_feature_view_projections
!= other.source_feature_view_projections
or self.source_request_sources != other.source_request_sources
or self.udf_string != other.udf_string
or self.udf.__code__.co_code != other.udf.__code__.co_code
or self.transformation != other.transformation
):
return False

Expand Down Expand Up @@ -200,11 +189,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
name=self.name,
features=[feature.to_proto() for feature in self.features],
sources=sources,
user_defined_function=UserDefinedFunctionProto(
name=self.udf.__name__,
body=dill.dumps(self.udf, recurse=True),
body_text=self.udf_string,
),
user_defined_function=self.transformation.to_proto()
if type(self.transformation) == OnDemandPandasTransformation
else None,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand Down Expand Up @@ -243,6 +230,16 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
RequestSource.from_proto(on_demand_source.request_data_source)
)

if (
on_demand_feature_view_proto.spec.WhichOneof("transformation")
== "user_defined_function"
):
transformation = OnDemandPandasTransformation.from_proto(
on_demand_feature_view_proto.spec.user_defined_function
)
else:
raise Exception("At least one transformation type needs to be provided")

on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
schema=[
Expand All @@ -253,10 +250,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
for feature in on_demand_feature_view_proto.spec.features
],
sources=sources,
udf=dill.loads(
on_demand_feature_view_proto.spec.user_defined_function.body
),
udf_string=on_demand_feature_view_proto.spec.user_defined_function.body_text,
transformation=transformation,
description=on_demand_feature_view_proto.spec.description,
tags=dict(on_demand_feature_view_proto.spec.tags),
owner=on_demand_feature_view_proto.spec.owner,
Expand Down Expand Up @@ -315,7 +309,8 @@ def get_transformed_features_df(
columns_to_cleanup.append(full_feature_ref)

# Compute transformed values and apply to each result row
df_with_transformed_features = self.udf.__call__(df_with_features)

df_with_transformed_features = self.transformation.transform(df_with_features)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
Expand All @@ -335,7 +330,7 @@ def get_transformed_features_df(
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features.rename(columns=rename_columns)

def infer_features(self):
def infer_features(self) -> None:
"""
Infers the set of features associated to this feature view from the input source.

Expand Down Expand Up @@ -365,7 +360,7 @@ def infer_features(self):
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None
df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype)
output_df: pd.DataFrame = self.udf.__call__(df)
output_df: pd.DataFrame = self.transformation.transform(df)
inferred_features = []
for f, dt in zip(output_df.columns, output_df.dtypes):
inferred_features.append(
Expand Down Expand Up @@ -396,7 +391,9 @@ def infer_features(self):
)

@staticmethod
def get_requested_odfvs(feature_refs, project, registry):
def get_requested_odfvs(
feature_refs, project, registry
) -> List["OnDemandFeatureView"]:
all_on_demand_feature_views = registry.list_on_demand_feature_views(
project, allow_cache=True
)
Expand Down Expand Up @@ -438,7 +435,7 @@ def on_demand_feature_view(
of the primary maintainer.
"""

def mainify(obj):
def mainify(obj) -> None:
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
# name as the original file defining the ODFV.
if obj.__module__ != "__main__":
Expand All @@ -447,15 +444,17 @@ def mainify(obj):
def decorator(user_function):
udf_string = dill.source.getsource(user_function)
mainify(user_function)

transformation = OnDemandPandasTransformation(user_function, udf_string)

on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
sources=sources,
schema=schema,
udf=user_function,
transformation=transformation,
description=description,
tags=tags,
owner=owner,
udf_string=udf_string,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
Expand Down
56 changes: 56 additions & 0 deletions sdk/python/feast/on_demand_pandas_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from types import FunctionType

import dill
import pandas as pd

from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)


class OnDemandPandasTransformation:
def __init__(self, udf: FunctionType, udf_string: str = ""):
"""
Creates an OnDemandPandasTransformation object.

Args:
udf: The user defined transformation function, which must take pandas
dataframes as inputs.
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
"""
self.udf = udf
self.udf_string = udf_string

def transform(self, df: pd.DataFrame) -> pd.DataFrame:
return self.udf.__call__(df)

def __eq__(self, other):
if not isinstance(other, OnDemandPandasTransformation):
raise TypeError(
"Comparisons should only involve OnDemandPandasTransformation class objects."
)

if not super().__eq__(other):
return False

if (
self.udf_string != other.udf_string
or self.udf.__code__.co_code != other.udf.__code__.co_code
):
return False

return True

def to_proto(self) -> UserDefinedFunctionProto:
return UserDefinedFunctionProto(
name=self.udf.__name__,
body=dill.dumps(self.udf, recurse=True),
body_text=self.udf_string,
)

@classmethod
def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto):
return OnDemandPandasTransformation(
udf=dill.loads(user_defined_function_proto.body),
udf_string=user_defined_function_proto.body_text,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
StreamFeatureView,
)
from feast.data_source import DataSource, RequestSource
from feast.on_demand_feature_view import OnDemandPandasTransformation
from feast.types import Array, FeastType, Float32, Float64, Int32, Int64
from tests.integration.feature_repos.universal.entities import (
customer,
Expand Down Expand Up @@ -69,8 +70,9 @@ def conv_rate_plus_100_feature_view(
name=conv_rate_plus_100.__name__,
schema=[] if infer_features else _features,
sources=sources,
udf=conv_rate_plus_100,
udf_string="raw udf source",
transformation=OnDemandPandasTransformation(
udf=conv_rate_plus_100, udf_string="raw udf source"
),
)


Expand Down Expand Up @@ -107,8 +109,9 @@ def similarity_feature_view(
name=similarity.__name__,
sources=sources,
schema=[] if infer_features else _fields,
udf=similarity,
udf_string="similarity raw udf",
transformation=OnDemandPandasTransformation(
udf=similarity, udf_string="similarity raw udf"
),
)


Expand Down
37 changes: 28 additions & 9 deletions sdk/python/tests/unit/test_on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
from feast.feature_view import FeatureView
from feast.field import Field
from feast.infra.offline_stores.file_source import FileSource
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.on_demand_feature_view import (
OnDemandFeatureView,
OnDemandPandasTransformation,
)
from feast.types import Float32


Expand Down Expand Up @@ -54,8 +57,9 @@ def test_hash():
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
udf=udf1,
udf_string="udf1 source code",
transformation=OnDemandPandasTransformation(
udf=udf1, udf_string="udf1 source code"
),
)
on_demand_feature_view_2 = OnDemandFeatureView(
name="my-on-demand-feature-view",
Expand All @@ -64,8 +68,9 @@ def test_hash():
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
udf=udf1,
udf_string="udf1 source code",
transformation=OnDemandPandasTransformation(
udf=udf1, udf_string="udf1 source code"
),
)
on_demand_feature_view_3 = OnDemandFeatureView(
name="my-on-demand-feature-view",
Expand All @@ -74,8 +79,9 @@ def test_hash():
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
udf=udf2,
udf_string="udf2 source code",
transformation=OnDemandPandasTransformation(
udf=udf2, udf_string="udf2 source code"
),
)
on_demand_feature_view_4 = OnDemandFeatureView(
name="my-on-demand-feature-view",
Expand All @@ -84,8 +90,21 @@ def test_hash():
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
udf=udf2,
udf_string="udf2 source code",
transformation=OnDemandPandasTransformation(
udf=udf2, udf_string="udf2 source code"
),
description="test",
)
on_demand_feature_view_4 = OnDemandFeatureView(
name="my-on-demand-feature-view",
sources=sources,
schema=[
Field(name="output1", dtype=Float32),
Field(name="output2", dtype=Float32),
],
transformation=OnDemandPandasTransformation(
udf=udf2, udf_string="udf2 source code"
),
description="test",
)

Expand Down
Loading