Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Don't prevent apply from running given duplicate empty names in data sources. Also fix repeated apply of Spark data source. #2415

Merged
merged 15 commits into from
Mar 29, 2022
1 change: 0 additions & 1 deletion protos/feast/core/SavedDataset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core";

import "google/protobuf/timestamp.proto";
import "feast/core/DataSource.proto";
import "feast/core/FeatureService.proto";

message SavedDatasetSpec {
// Name of the dataset. Must be unique since it's possible to overwrite dataset by name
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def __repr__(self):
def __str__(self):
return str(MessageToJson(self.to_proto()))

def __lt__(self, other):
return self.name < other.name

def __hash__(self):
return hash((id(self), self.name))

Expand Down
11 changes: 11 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import warnings
from datetime import datetime
from pathlib import Path
from typing import List, Optional
Expand Down Expand Up @@ -151,6 +152,11 @@ def data_source_describe(ctx: click.Context, name: str):
print(e)
exit(1)

warnings.warn(
"Describing data sources will only work properly if all data sources have names or table names specified. "
"Starting Feast 0.21, data source unique names will be required to encourage data source discovery.",
adchia marked this conversation as resolved.
Show resolved Hide resolved
RuntimeWarning,
)
print(
yaml.dump(
yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False
Expand All @@ -173,6 +179,11 @@ def data_source_list(ctx: click.Context):

from tabulate import tabulate

warnings.warn(
"Listing data sources will only work properly if all data sources have names or table names specified. "
"Starting Feast 0.21, data source unique names will be required to encourage data source discovery",
RuntimeWarning,
)
print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain"))


Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, Optional, Tuple

from google.protobuf.json_format import MessageToJson

from feast import type_map
from feast.data_format import StreamFormat
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
Expand Down Expand Up @@ -192,6 +194,12 @@ def __init__(
def __hash__(self):
return hash((id(self), self.name))

def __str__(self):
return str(MessageToJson(self.to_proto()))

def __lt__(self, other):
return str(self) < str(other)

def __eq__(self, other):
if not isinstance(other, DataSource):
raise TypeError("Comparisons should only involve DataSource class objects.")
Expand Down
23 changes: 14 additions & 9 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def to_string(self):
continue
if feast_object_diff.transition_type == TransitionType.UNCHANGED:
continue
if feast_object_diff.feast_object_type == FeastObjectType.DATA_SOURCE:
# TODO(adchia): Print statements out starting in Feast 0.21
continue
action, color = message_action_map[feast_object_diff.transition_type]
log_string += f"{action} {feast_object_diff.feast_object_type.value} {Style.BRIGHT + color}{feast_object_diff.name}{Style.RESET_ALL}\n"
if feast_object_diff.transition_type == TransitionType.UPDATE:
Expand All @@ -77,14 +80,16 @@ def to_string(self):

def tag_objects_for_keep_delete_update_add(
existing_objs: Iterable[FeastObject], desired_objs: Iterable[FeastObject]
) -> Tuple[Set[FeastObject], Set[FeastObject], Set[FeastObject], Set[FeastObject]]:
) -> Tuple[List[FeastObject], List[FeastObject], List[FeastObject], List[FeastObject]]:
existing_obj_names = {e.name for e in existing_objs}
desired_objs = list(desired_objs)
existing_objs = list(existing_objs)
desired_obj_names = {e.name for e in desired_objs}

objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names}
objs_to_update = {e for e in desired_objs if e.name in existing_obj_names}
objs_to_keep = {e for e in existing_objs if e.name in desired_obj_names}
objs_to_delete = {e for e in existing_objs if e.name not in desired_obj_names}
objs_to_add = [e for e in desired_objs if e.name not in existing_obj_names]
objs_to_update = [e for e in desired_objs if e.name in existing_obj_names]
objs_to_keep = [e for e in existing_objs if e.name in desired_obj_names]
objs_to_delete = [e for e in existing_objs if e.name not in desired_obj_names]
adchia marked this conversation as resolved.
Show resolved Hide resolved

return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add

Expand Down Expand Up @@ -149,10 +154,10 @@ def diff_registry_objects(
def extract_objects_for_keep_delete_update_add(
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
) -> Tuple[
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, List[FeastObject]],
Dict[FeastObjectType, List[FeastObject]],
Dict[FeastObjectType, List[FeastObject]],
Dict[FeastObjectType, List[FeastObject]],
]:
"""
Returns the objects in the registry that must be modified to achieve the desired repo state.
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def __init__(
def __hash__(self) -> int:
return hash((id(self), self.name))

def __lt__(self, other):
return self.name < other.name

def __eq__(self, other):
if not isinstance(other, Entity):
raise TypeError("Comparisons should only involve Entity class objects.")
Expand Down
19 changes: 11 additions & 8 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ def apply(
for v in odfv.input_request_data_sources.values():
data_sources_set_to_update.add(v)

data_sources_to_update = list(data_sources_set_to_update)
data_sources_to_update = sorted(list(data_sources_set_to_update))

# Validate all feature views and make inferences.
self._validate_all_feature_views(
Expand Down Expand Up @@ -2063,14 +2063,17 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]):
def _validate_data_sources(data_sources: List[DataSource]):
""" Verify data sources have case-insensitively unique names"""
ds_names = set()
for fv in data_sources:
case_insensitive_ds_name = fv.name.lower()
for ds in data_sources:
case_insensitive_ds_name = ds.name.lower()
if case_insensitive_ds_name in ds_names:
raise ValueError(
f"More than one data source with name {case_insensitive_ds_name} found. "
f"Please ensure that all data source names are case-insensitively unique. "
f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file."
)
if case_insensitive_ds_name.strip():
warnings.warn(
f"More than one data source with name {case_insensitive_ds_name} found. "
f"Please ensure that all data source names are case-insensitively unique. "
f"It may be necessary to ignore certain files in your feature repository by using a .feastignore "
f"file. Starting in Feast 0.21, unique names (perhaps inferred from the table name) will be "
f"required in data sources to encourage data source discovery"
)
else:
ds_names.add(case_insensitive_ds_name)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/go_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
from subprocess import Popen
from typing import Any, Dict, List, Optional, Union

import grpc
from tenacity import retry, stop_after_attempt, stop_after_delay, wait_exponential

import feast
import grpc
from feast.errors import FeatureNameCollisionError, InvalidFeaturesParameterType
from feast.feature_service import FeatureService
from feast.flags_helper import is_test
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
else:
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`: {self.query}"
adchia marked this conversation as resolved.
Show resolved Hide resolved
),
DeprecationWarning,
)
Expand Down
16 changes: 8 additions & 8 deletions sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def __init__(
query (optional): The query to be executed to obtain the features.
name (optional): Name for the source. Defaults to the table_ref if not specified.
"""
# The default Redshift schema is named "public".
_schema = "public" if table and not schema else schema
self.redshift_options = RedshiftOptions(
table=table, schema=_schema, query=query
)

if table is None and query is None:
raise ValueError('No "table" argument provided.')
_name = name
Expand All @@ -50,7 +56,8 @@ def __init__(
else:
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) "
f"or `table`: {self.query}"
adchia marked this conversation as resolved.
Show resolved Hide resolved
),
DeprecationWarning,
)
Expand All @@ -63,13 +70,6 @@ def __init__(
date_partition_column,
)

# The default Redshift schema is named "public".
_schema = "public" if table and not schema else schema

self.redshift_options = RedshiftOptions(
table=table, schema=_schema, query=query
)

@staticmethod
def from_proto(data_source: DataSourceProto):
"""
Expand Down
16 changes: 8 additions & 8 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ def __init__(
"""
if table is None and query is None:
raise ValueError('No "table" argument provided.')
# The default Snowflake schema is named "PUBLIC".
_schema = "PUBLIC" if (database and table and not schema) else schema

self.snowflake_options = SnowflakeOptions(
database=database, schema=_schema, table=table, query=query
)

# If no name, use the table as the default name
_name = name
Expand All @@ -53,7 +59,8 @@ def __init__(
else:
warnings.warn(
(
"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) or `table`."
f"Starting in Feast 0.21, Feast will require either a name for a data source (if using query) "
f"or `table`: {self.query}"
),
DeprecationWarning,
)
Expand All @@ -66,13 +73,6 @@ def __init__(
date_partition_column,
)

# The default Snowflake schema is named "PUBLIC".
_schema = "PUBLIC" if (database and table and not schema) else schema

self.snowflake_options = SnowflakeOptions(
database=database, schema=_schema, table=table, query=query
)

@staticmethod
def from_proto(data_source: DataSourceProto):
"""
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def get_objects_from_registry(
registry: "Registry", project: str
) -> Dict["FeastObjectType", List[Any]]:
return {
FeastObjectType.DATA_SOURCE: registry.list_data_sources(project=project),
FeastObjectType.DATA_SOURCE: sorted(
registry.list_data_sources(project=project)
),
FeastObjectType.ENTITY: registry.list_entities(project=project),
FeastObjectType.FEATURE_VIEW: registry.list_feature_views(project=project),
FeastObjectType.ON_DEMAND_FEATURE_VIEW: registry.list_on_demand_feature_views(
Expand Down Expand Up @@ -314,11 +316,13 @@ def apply_data_source(
commit: Whether to immediately commit to the registry
"""
registry = self._prepare_registry_for_changes()

for idx, existing_data_source_proto in enumerate(registry.data_sources):
if existing_data_source_proto.name == data_source.name:
del registry.data_sources[idx]
data_source_proto = data_source.to_proto()
data_source_proto.data_source_class_type = (
f"{data_source.__class__.__module__}.{data_source.__class__.__name__}"
)
data_source_proto.project = project
data_source_proto.data_source_class_type = (
f"{data_source.__class__.__module__}.{data_source.__class__.__name__}"
Expand Down
6 changes: 2 additions & 4 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,8 @@ def extract_objects_for_apply_delete(project, registry, repo):
return (
all_to_apply,
all_to_delete,
set(
objs_to_add[FeastObjectType.FEATURE_VIEW].union(
objs_to_update[FeastObjectType.FEATURE_VIEW]
)
set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union(
set(objs_to_update[FeastObjectType.FEATURE_VIEW])
),
objs_to_delete[FeastObjectType.FEATURE_VIEW],
)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/transformation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import sys
from concurrent import futures

import grpc
import pyarrow as pa
from grpc_reflection.v1alpha import reflection

import grpc
from feast.errors import OnDemandFeatureViewNotFoundException
from feast.feature_store import FeatureStore
from feast.protos.feast.serving.TransformationService_pb2 import (
Expand Down
12 changes: 12 additions & 0 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
created_timestamp_column="created_timestamp",
)

driver_locations_source_query = BigQuerySource(
query="SELECT * from feast-oss.public.drivers",
event_timestamp_column="event_timestamp",
created_timestamp_column="created_timestamp",
)

driver_locations_source_query_2 = BigQuerySource(
query="SELECT lat * 2 FROM feast-oss.public.drivers",
event_timestamp_column="event_timestamp",
created_timestamp_column="created_timestamp",
)

customer_profile_source = BigQuerySource(
name="customer_profile_source",
table_ref="feast-oss.public.customers",
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/integration/registration/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_universal_cli(environment: Environment):
["data-sources", "describe", "customer_profile_source"], cwd=repo_path,
)
assertpy.assert_that(result.returncode).is_equal_to(0)
assertpy.assert_that(fs.list_data_sources()).is_length(4)
assertpy.assert_that(fs.list_data_sources()).is_length(5)

# entity & feature view describe commands should fail when objects don't exist
result = runner.run(["entities", "describe", "foo"], cwd=repo_path)
Expand Down