-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Create stream and batch feature view abstractions (#2559)
* feat: Create stream and batch feature view abstractions Signed-off-by: Achal Shah <achals@gmail.com> * CR Signed-off-by: Achal Shah <achals@gmail.com> * CR Signed-off-by: Achal Shah <achals@gmail.com>
- Loading branch information
Showing
4 changed files
with
183 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
from datetime import timedelta | ||
from typing import Dict, List, Optional, Union | ||
|
||
from feast.data_source import DataSource | ||
from feast.entity import Entity | ||
from feast.feature_view import FeatureView | ||
from feast.field import Field | ||
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto | ||
|
||
SUPPORTED_BATCH_SOURCES = { | ||
"BigQuerySource", | ||
"FileSource", | ||
"RedshiftSource", | ||
"SnowflakeSource", | ||
"SparkSource", | ||
"TrinoSource", | ||
} | ||
|
||
|
||
class BatchFeatureView(FeatureView): | ||
def __init__( | ||
self, | ||
*, | ||
name: Optional[str] = None, | ||
entities: Optional[Union[List[Entity], List[str]]] = None, | ||
ttl: Optional[timedelta] = None, | ||
tags: Optional[Dict[str, str]] = None, | ||
online: bool = True, | ||
description: str = "", | ||
owner: str = "", | ||
schema: Optional[List[Field]] = None, | ||
source: Optional[DataSource] = None, | ||
): | ||
|
||
if source is None: | ||
raise ValueError("Feature views need a source specified") | ||
if ( | ||
type(source).__name__ not in SUPPORTED_BATCH_SOURCES | ||
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE | ||
): | ||
raise ValueError( | ||
f"Batch feature views need a batch source, expected one of {SUPPORTED_BATCH_SOURCES} " | ||
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " | ||
) | ||
|
||
super().__init__( | ||
name=name, | ||
entities=entities, | ||
ttl=ttl, | ||
batch_source=None, | ||
stream_source=None, | ||
tags=tags, | ||
online=online, | ||
description=description, | ||
owner=owner, | ||
schema=schema, | ||
source=source, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
from datetime import timedelta | ||
from typing import Dict, List, Optional, Union | ||
|
||
from feast.data_source import DataSource | ||
from feast.entity import Entity | ||
from feast.feature_view import FeatureView | ||
from feast.field import Field | ||
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto | ||
|
||
SUPPORTED_STREAM_SOURCES = { | ||
"KafkaSource", | ||
"KinesisSource", | ||
} | ||
|
||
|
||
class StreamFeatureView(FeatureView): | ||
def __init__( | ||
self, | ||
*, | ||
name: Optional[str] = None, | ||
entities: Optional[Union[List[Entity], List[str]]] = None, | ||
ttl: Optional[timedelta] = None, | ||
tags: Optional[Dict[str, str]] = None, | ||
online: bool = True, | ||
description: str = "", | ||
owner: str = "", | ||
schema: Optional[List[Field]] = None, | ||
source: Optional[DataSource] = None, | ||
): | ||
|
||
if source is None: | ||
raise ValueError("Feature views need a source specified") | ||
if ( | ||
type(source).__name__ not in SUPPORTED_STREAM_SOURCES | ||
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE | ||
): | ||
raise ValueError( | ||
f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} " | ||
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead " | ||
) | ||
|
||
super().__init__( | ||
name=name, | ||
entities=entities, | ||
ttl=ttl, | ||
batch_source=None, | ||
stream_source=None, | ||
tags=tags, | ||
online=online, | ||
description=description, | ||
owner=owner, | ||
schema=schema, | ||
source=source, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
from datetime import timedelta | ||
|
||
import pytest | ||
|
||
from feast.batch_feature_view import BatchFeatureView | ||
from feast.data_format import AvroFormat | ||
from feast.data_source import KafkaSource | ||
from feast.infra.offline_stores.file_source import FileSource | ||
from feast.stream_feature_view import StreamFeatureView | ||
|
||
|
||
def test_create_batch_feature_view(): | ||
batch_source = FileSource(path="some path") | ||
BatchFeatureView( | ||
name="test batch feature view", | ||
entities=[], | ||
ttl=timedelta(days=30), | ||
source=batch_source, | ||
) | ||
|
||
with pytest.raises(ValueError): | ||
BatchFeatureView( | ||
name="test batch feature view", entities=[], ttl=timedelta(days=30) | ||
) | ||
|
||
stream_source = KafkaSource( | ||
name="kafka", | ||
event_timestamp_column="", | ||
bootstrap_servers="", | ||
message_format=AvroFormat(""), | ||
topic="topic", | ||
batch_source=FileSource(path="some path"), | ||
) | ||
with pytest.raises(ValueError): | ||
BatchFeatureView( | ||
name="test batch feature view", | ||
entities=[], | ||
ttl=timedelta(days=30), | ||
source=stream_source, | ||
) | ||
|
||
|
||
def test_create_stream_feature_view(): | ||
stream_source = KafkaSource( | ||
name="kafka", | ||
event_timestamp_column="", | ||
bootstrap_servers="", | ||
message_format=AvroFormat(""), | ||
topic="topic", | ||
batch_source=FileSource(path="some path"), | ||
) | ||
StreamFeatureView( | ||
name="test batch feature view", | ||
entities=[], | ||
ttl=timedelta(days=30), | ||
source=stream_source, | ||
) | ||
|
||
with pytest.raises(ValueError): | ||
StreamFeatureView( | ||
name="test batch feature view", entities=[], ttl=timedelta(days=30) | ||
) | ||
|
||
with pytest.raises(ValueError): | ||
StreamFeatureView( | ||
name="test batch feature view", | ||
entities=[], | ||
ttl=timedelta(days=30), | ||
source=FileSource(path="some path"), | ||
) |