diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index de8fac9451..30f04e9c06 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -58,6 +58,7 @@ def __init__( *, name: str, join_keys: Optional[List[str]] = None, + value_type: Optional[ValueType] = None, description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -70,6 +71,8 @@ def __init__( join_keys (optional): A list of properties that uniquely identifies different entities within the collection. This currently only supports a list of size one, but is intended to eventually support multiple join keys. + value_type (optional): The type of the entity, such as string or float. If not specified, + it will be inferred from the schema of the underlying data source. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the entity, typically the email of the primary maintainer. @@ -78,7 +81,7 @@ def __init__( ValueError: Parameters are specified incorrectly. """ self.name = name - self.value_type = ValueType.UNKNOWN + self.value_type = value_type or ValueType.UNKNOWN if join_keys and len(join_keys) > 1: # TODO(felixwang9817): When multiple join keys are supported, add a `join_keys` attribute diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 3125f319a9..cdd507868d 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -35,7 +35,9 @@ from feast.protos.feast.core.FeatureView_pb2 import ( MaterializationInterval as MaterializationIntervalProto, ) +from feast.types import from_value_type from feast.usage import log_exceptions +from feast.value_type import ValueType warnings.simplefilter("once", DeprecationWarning) @@ -115,6 +117,7 @@ def __init__( If a stream source, the source should contain a batch_source for backfills & batch materialization. schema (optional): The schema of the feature view, including feature, timestamp, and entity columns. + # TODO: clarify that schema is only useful here... entities (optional): The list of entities with which this group of features is associated. ttl (optional): The amount of time this group of features lives. A ttl of 0 indicates that this group of features lives forever. Note that large ttl's or a ttl of 0 @@ -160,9 +163,29 @@ def __init__( for entity in entities: join_keys.append(entity.join_key) + # Ensure that entities have unique join keys. + if len(set(join_keys)) < len(join_keys): + raise ValueError( + "A feature view should not have entities that share a join key." + ) + for field in self.schema: if field.name in join_keys: self.entity_columns.append(field) + + # Confirm that the inferred type matches the specified entity type, if it exists. + matching_entities = ( + [e for e in entities if e.join_key == field.name] + if entities + else [] + ) + assert len(matching_entities) == 1 + entity = matching_entities[0] + if entity.value_type != ValueType.UNKNOWN: + if from_value_type(entity.value_type) != field.dtype: + raise ValueError( + f"Entity {entity.name} has type {entity.value_type}, which does not match the inferred type {field.dtype}." + ) else: features.append(field) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 267d0d5026..d416763bd3 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -135,8 +135,7 @@ def update_feature_views_with_inferred_features_and_entities( if field.name not in [feature.name for feature in fv.features]: fv.features.append(field) - # Since the `value_type` parameter has not yet been fully deprecated for - # entities, we respect the `value_type` attribute if it still exists. + # Respect the `value_type` attribute of the entity, if it is specified. for entity_name in fv.entities: entity = entity_name_to_entity_map[entity_name] if ( diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index c5ed83c12f..b4a0da7e47 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -15,6 +15,7 @@ from feast.on_demand_feature_view import on_demand_feature_view from feast.repo_config import RepoConfig from feast.types import Float32, Float64, Int64, String, UnixTimestamp +from feast.value_type import ValueType from tests.utils.data_source_test_creator import prep_file_source @@ -216,6 +217,78 @@ def test_feature_view_inference_respects_basic_inference(): assert len(feature_view_2.entity_columns) == 2 +def test_feature_view_inference_on_entity_value_types(): + """ + Tests that feature view inference correctly uses the entity `value_type` attribute. + """ + entity1 = Entity( + name="test1", join_keys=["id_join_key"], value_type=ValueType.INT64 + ) + file_source = FileSource(path="some path") + feature_view_1 = FeatureView( + name="test1", + entities=[entity1], + schema=[Field(name="int64_col", dtype=Int64)], + source=file_source, + ) + + assert len(feature_view_1.schema) == 1 + assert len(feature_view_1.features) == 1 + assert len(feature_view_1.entity_columns) == 0 + + update_feature_views_with_inferred_features_and_entities( + [feature_view_1], + [entity1], + RepoConfig( + provider="local", project="test", entity_key_serialization_version=2 + ), + ) + + # The schema is only used as a parameter, as is therefore not updated during inference. + assert len(feature_view_1.schema) == 1 + + # Since there is already a feature specified, additional features are not inferred. + assert len(feature_view_1.features) == 1 + + # The single entity column is inferred correctly and has the expected type. + assert len(feature_view_1.entity_columns) == 1 + assert feature_view_1.entity_columns[0].dtype == Int64 + + +def test_conflicting_entity_value_types(): + """ + Tests that an error is thrown when the entity value types conflict. + """ + entity1 = Entity( + name="test1", join_keys=["id_join_key"], value_type=ValueType.INT64 + ) + file_source = FileSource(path="some path") + + with pytest.raises(ValueError): + _ = FeatureView( + name="test1", + entities=[entity1], + schema=[ + Field(name="int64_col", dtype=Int64), + Field( + name="id_join_key", dtype=Float64 + ), # Conflicts with the defined entity + ], + source=file_source, + ) + + # There should be no error here. + _ = FeatureView( + name="test1", + entities=[entity1], + schema=[ + Field(name="int64_col", dtype=Int64), + Field(name="id_join_key", dtype=Int64), # Conflicts with the defined entity + ], + source=file_source, + ) + + def test_feature_view_inference_on_entity_columns(simple_dataset_1): """ Tests that feature view inference correctly infers entity columns. diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 0fe3f839e1..379396e5c6 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -15,6 +15,20 @@ from feast.types import Float32 +def test_create_feature_view_with_conflicting_entities(): + user1 = Entity(name="user1", join_keys=["user_id"]) + user2 = Entity(name="user2", join_keys=["user_id"]) + batch_source = FileSource(path="some path") + + with pytest.raises(ValueError): + _ = FeatureView( + name="test", + entities=[user1, user2], + ttl=timedelta(days=30), + source=batch_source, + ) + + def test_create_batch_feature_view(): batch_source = FileSource(path="some path") BatchFeatureView(