Skip to content

Always pass full and partial feature names to ODFV #2003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ def _augment_response_with_on_demand_transforms(
for odfv_name, _feature_refs in odfv_feature_refs.items():
odfv = all_on_demand_feature_views[odfv_name]
transformed_features_df = odfv.get_transformed_features_df(
full_feature_names, initial_response_df
initial_response_df
)
for row_idx in range(len(result_rows)):
result_row = result_rows[row_idx]
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def to_df(self) -> pd.DataFrame:
# TODO(adchia): Fix requirement to specify dependent feature views in feature_refs
for odfv in self.on_demand_feature_views:
features_df = features_df.join(
odfv.get_transformed_features_df(self.full_feature_names, features_df)
odfv.get_transformed_features_df(features_df)
)
return features_df

Expand All @@ -69,7 +69,7 @@ def to_arrow(self) -> pyarrow.Table:
features_df = self._to_df_internal()
for odfv in self.on_demand_feature_views:
features_df = features_df.join(
odfv.get_transformed_features_df(self.full_feature_names, features_df)
odfv.get_transformed_features_df(features_df)
)
return pyarrow.Table.from_pandas(features_df)

Expand Down
24 changes: 12 additions & 12 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,21 @@ def get_request_data_schema(self) -> Dict[str, ValueType]:
return schema

def get_transformed_features_df(
self, full_feature_names: bool, df_with_features: pd.DataFrame
self, df_with_features: pd.DataFrame
) -> pd.DataFrame:
# Apply on demand transformations
# TODO(adchia): Include only the feature values from the specified input FVs in the ODFV.
# Copy over un-prefixed features even if not requested since transform may need it
columns_to_cleanup = []
if full_feature_names:
for input_fv in self.input_feature_views.values():
for feature in input_fv.features:
full_feature_ref = f"{input_fv.name}__{feature.name}"
if full_feature_ref in df_with_features.keys():
df_with_features[feature.name] = df_with_features[
full_feature_ref
]
columns_to_cleanup.append(feature.name)
for input_fv in self.input_feature_views.values():
for feature in input_fv.features:
full_feature_ref = f"{input_fv.name}__{feature.name}"
if full_feature_ref in df_with_features.keys():
# Make sure the partial feature name is always present
df_with_features[feature.name] = df_with_features[full_feature_ref]
columns_to_cleanup.append(feature.name)
elif feature.name in df_with_features.keys():
# Make sure the full feature name is always present
df_with_features[full_feature_ref] = df_with_features[feature.name]
columns_to_cleanup.append(full_feature_ref)

# Compute transformed values and apply to each result row
df_with_transformed_features = self.udf.__call__(df_with_features)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/transformation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def TransformFeatures(self, request, context):

df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas()

result_df = odfv.get_transformed_features_df(True, df)
result_df = odfv.get_transformed_features_df(df)
result_arrow = pa.Table.from_pandas(result_df)
sink = pa.BufferOutputStream()
writer = pa.ipc.new_file(sink, result_arrow.schema)
Expand Down