Skip to content

Commit

Permalink
feat: Adding support for native Python transformations on a single di…
Browse files Browse the repository at this point in the history
…ctionary (#4724)

* feat: Adding support for native Python transformations on a dictionary

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* Updated type checking and added exception handling to try basic dict...not an ideal solution

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* updated tests

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* adding protos

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* fixed unit test

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

---------

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo authored Nov 6, 2024
1 parent 84b24b5 commit 9bbc1c6
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 56 deletions.
1 change: 1 addition & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ message OnDemandFeatureViewSpec {
repeated string entities = 13;
// List of specifications for each entity defined as part of this feature view.
repeated FeatureSpecV2 entity_columns = 14;
bool singleton = 15;
}

message OnDemandFeatureViewMeta {
Expand Down
49 changes: 38 additions & 11 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class OnDemandFeatureView(BaseFeatureView):
tags: dict[str, str]
owner: str
write_to_online_store: bool
singleton: bool

def __init__( # noqa: C901
self,
Expand All @@ -98,6 +99,7 @@ def __init__( # noqa: C901
tags: Optional[dict[str, str]] = None,
owner: str = "",
write_to_online_store: bool = False,
singleton: bool = False,
):
"""
Creates an OnDemandFeatureView object.
Expand All @@ -121,6 +123,8 @@ def __init__( # noqa: C901
of the primary maintainer.
write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to
the online store for faster retrieval.
singleton (optional): A boolean that indicates whether the transformation is executed on a singleton
(only applicable when mode="python").
"""
super().__init__(
name=name,
Expand Down Expand Up @@ -204,6 +208,9 @@ def __init__( # noqa: C901
self.features = features
self.feature_transformation = feature_transformation
self.write_to_online_store = write_to_online_store
self.singleton = singleton
if self.singleton and self.mode != "python":
raise ValueError("Singleton is only supported for Python mode.")

@property
def proto_class(self) -> type[OnDemandFeatureViewProto]:
Expand All @@ -221,6 +228,7 @@ def __copy__(self):
tags=self.tags,
owner=self.owner,
write_to_online_store=self.write_to_online_store,
singleton=self.singleton,
)
fv.entities = self.entities
fv.features = self.features
Expand All @@ -247,6 +255,7 @@ def __eq__(self, other):
or self.feature_transformation != other.feature_transformation
or self.write_to_online_store != other.write_to_online_store
or sorted(self.entity_columns) != sorted(other.entity_columns)
or self.singleton != other.singleton
):
return False

Expand Down Expand Up @@ -328,6 +337,7 @@ def to_proto(self) -> OnDemandFeatureViewProto:
tags=self.tags,
owner=self.owner,
write_to_online_store=self.write_to_online_store,
singleton=self.singleton if self.singleton else False,
)

return OnDemandFeatureViewProto(spec=spec, meta=meta)
Expand Down Expand Up @@ -434,6 +444,9 @@ def from_proto(
]
else:
entity_columns = []
singleton = False
if hasattr(on_demand_feature_view_proto.spec, "singleton"):
singleton = on_demand_feature_view_proto.spec.singleton

on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
Expand All @@ -451,6 +464,7 @@ def from_proto(
tags=dict(on_demand_feature_view_proto.spec.tags),
owner=on_demand_feature_view_proto.spec.owner,
write_to_online_store=write_to_online_store,
singleton=singleton,
)

on_demand_feature_view_obj.entities = entities
Expand Down Expand Up @@ -614,17 +628,19 @@ def transform_dict(
feature_dict[full_feature_ref] = feature_dict[feature.name]
columns_to_cleanup.append(str(full_feature_ref))

output_dict: dict[str, Any] = self.feature_transformation.transform(
feature_dict
)
if self.singleton and self.mode == "python":
output_dict: dict[str, Any] = (
self.feature_transformation.transform_singleton(feature_dict)
)
else:
output_dict = self.feature_transformation.transform(feature_dict)
for feature_name in columns_to_cleanup:
del output_dict[feature_name]
return output_dict

def infer_features(self) -> None:
inferred_features = self.feature_transformation.infer_features(
self._construct_random_input()
)
random_input = self._construct_random_input(singleton=self.singleton)
inferred_features = self.feature_transformation.infer_features(random_input)

if self.features:
missing_features = []
Expand All @@ -644,8 +660,10 @@ def infer_features(self) -> None:
f"Could not infer Features for the feature view '{self.name}'.",
)

def _construct_random_input(self) -> dict[str, list[Any]]:
rand_dict_value: dict[ValueType, list[Any]] = {
def _construct_random_input(
self, singleton: bool = False
) -> dict[str, Union[list[Any], Any]]:
rand_dict_value: dict[ValueType, Union[list[Any], Any]] = {
ValueType.BYTES: [str.encode("hello world")],
ValueType.STRING: ["hello world"],
ValueType.INT32: [1],
Expand All @@ -663,20 +681,25 @@ def _construct_random_input(self) -> dict[str, list[Any]]:
ValueType.BOOL_LIST: [[True]],
ValueType.UNIX_TIMESTAMP_LIST: [[_utc_now()]],
}
if singleton:
rand_dict_value = {k: rand_dict_value[k][0] for k in rand_dict_value}

rand_missing_value = [None] if singleton else None
feature_dict = {}
for feature_view_projection in self.source_feature_view_projections.values():
for feature in feature_view_projection.features:
feature_dict[f"{feature_view_projection.name}__{feature.name}"] = (
rand_dict_value.get(feature.dtype.to_value_type(), [None])
rand_dict_value.get(
feature.dtype.to_value_type(), rand_missing_value
)
)
feature_dict[f"{feature.name}"] = rand_dict_value.get(
feature.dtype.to_value_type(), [None]
feature.dtype.to_value_type(), rand_missing_value
)
for request_data in self.source_request_sources.values():
for field in request_data.schema:
feature_dict[f"{field.name}"] = rand_dict_value.get(
field.dtype.to_value_type(), [None]
field.dtype.to_value_type(), rand_missing_value
)

return feature_dict
Expand Down Expand Up @@ -713,6 +736,7 @@ def on_demand_feature_view(
tags: Optional[dict[str, str]] = None,
owner: str = "",
write_to_online_store: bool = False,
singleton: bool = False,
):
"""
Creates an OnDemandFeatureView object with the given user function as udf.
Expand All @@ -731,6 +755,8 @@ def on_demand_feature_view(
of the primary maintainer.
write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to
the online store for faster retrieval.
singleton (optional): A boolean that indicates whether the transformation is executed on a singleton
(only applicable when mode="python").
"""

def mainify(obj) -> None:
Expand Down Expand Up @@ -775,6 +801,7 @@ def decorator(user_function):
owner=owner,
write_to_online_store=write_to_online_store,
entities=entities,
singleton=singleton,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
Expand Down
24 changes: 12 additions & 12 deletions sdk/python/feast/protos/feast/core/OnDemandFeatureView_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message):
WRITE_TO_ONLINE_STORE_FIELD_NUMBER: builtins.int
ENTITIES_FIELD_NUMBER: builtins.int
ENTITY_COLUMNS_FIELD_NUMBER: builtins.int
SINGLETON_FIELD_NUMBER: builtins.int
name: builtins.str
"""Name of the feature view. Must be unique. Not updated."""
project: builtins.str
Expand Down Expand Up @@ -137,6 +138,7 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message):
@property
def entity_columns(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[feast.core.Feature_pb2.FeatureSpecV2]:
"""List of specifications for each entity defined as part of this feature view."""
singleton: builtins.bool
def __init__(
self,
*,
Expand All @@ -153,9 +155,10 @@ class OnDemandFeatureViewSpec(google.protobuf.message.Message):
write_to_online_store: builtins.bool = ...,
entities: collections.abc.Iterable[builtins.str] | None = ...,
entity_columns: collections.abc.Iterable[feast.core.Feature_pb2.FeatureSpecV2] | None = ...,
singleton: builtins.bool = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["feature_transformation", b"feature_transformation", "user_defined_function", b"user_defined_function"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "owner", b"owner", "project", b"project", "sources", b"sources", "tags", b"tags", "user_defined_function", b"user_defined_function", "write_to_online_store", b"write_to_online_store"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "mode", b"mode", "name", b"name", "owner", b"owner", "project", b"project", "singleton", b"singleton", "sources", b"sources", "tags", b"tags", "user_defined_function", b"user_defined_function", "write_to_online_store", b"write_to_online_store"]) -> None: ...

global___OnDemandFeatureViewSpec = OnDemandFeatureViewSpec

Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/transformation/pandas_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def transform_arrow(
def transform(self, input_df: pd.DataFrame) -> pd.DataFrame:
return self.udf(input_df)

def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame:
raise ValueError(
"PandasTransformation does not support singleton transformations."
)

def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
df = pd.DataFrame.from_dict(random_input)
output_df: pd.DataFrame = self.transform(df)
Expand Down
33 changes: 24 additions & 9 deletions sdk/python/feast/transformation/python_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,39 @@ def transform(self, input_dict: dict) -> dict:
output_dict = self.udf.__call__(input_dict)
return {**input_dict, **output_dict}

def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
output_dict: dict[str, list[Any]] = self.transform(random_input)
def transform_singleton(self, input_dict: dict) -> dict:
# This flattens the list of elements to extract the first one
# in the case of a singleton element, it takes the value directly
# in the case of a list of lists, it takes the first list
input_dict = {k: v[0] for k, v in input_dict.items()}
output_dict = self.udf.__call__(input_dict)
return {**input_dict, **output_dict}

def infer_features(self, random_input: dict[str, Any]) -> list[Field]:
output_dict: dict[str, Any] = self.transform(random_input)

fields = []
for feature_name, feature_value in output_dict.items():
if len(feature_value) <= 0:
raise TypeError(
f"Failed to infer type for feature '{feature_name}' with value "
+ f"'{feature_value}' since no items were returned by the UDF."
)
if isinstance(feature_value, list):
if len(feature_value) <= 0:
raise TypeError(
f"Failed to infer type for feature '{feature_name}' with value "
+ f"'{feature_value}' since no items were returned by the UDF."
)
inferred_type = type(feature_value[0])
inferred_value = feature_value[0]
else:
inferred_type = type(feature_value)
inferred_value = feature_value

fields.append(
Field(
name=feature_name,
dtype=from_value_type(
python_type_to_feast_value_type(
feature_name,
value=feature_value[0],
type_name=type(feature_value[0]).__name__,
value=inferred_value,
type_name=inferred_type.__name__,
)
),
)
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/transformation/substrait_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def table_provider(names, schema: pyarrow.Schema):
).read_all()
return table.to_pandas()

def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame:
raise ValueError(
"SubstraitTransform does not support singleton transformations."
)

def transform_ibis(self, table):
return self.ibis_function(table)

Expand Down
Loading

0 comments on commit 9bbc1c6

Please sign in to comment.