Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create stream and batch feature view abstractions #2559

Merged
merged 3 commits into from
Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Create stream and batch feature view abstractions
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Apr 18, 2022
commit 63ab5757063023b1391f88083a4a318d96d57d85
61 changes: 61 additions & 0 deletions sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from datetime import timedelta
from typing import Dict, List, Optional, Union

from google.protobuf.duration_pb2 import Duration
achals marked this conversation as resolved.
Show resolved Hide resolved

from feast import Entity, Feature, Field
achals marked this conversation as resolved.
Show resolved Hide resolved
from feast.data_source import DataSource
from feast.feature_view import FeatureView
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto

SUPPORTED_BATCH_SOURCES = {
"BigQuerySource",
"FileSource",
"RedshiftSource",
"SnowflakeSource",
"SparkSource",
"TrinoSource",
}


class BatchFeatureView(FeatureView):
def __init__(
self,
*,
achals marked this conversation as resolved.
Show resolved Hide resolved
name: Optional[str] = None,
entities: Optional[Union[List[Entity], List[str]]] = None,
ttl: Optional[Union[Duration, timedelta]] = None,
features: Optional[List[Feature]] = None,
achals marked this conversation as resolved.
Show resolved Hide resolved
tags: Optional[Dict[str, str]] = None,
online: bool = True,
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
):

if source is None:
raise ValueError("Feature views need a source specified")
if (
type(source).__name__ not in SUPPORTED_BATCH_SOURCES
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE
):
raise ValueError(
f"Batch feature views need a batch source, expected one of {SUPPORTED_BATCH_SOURCES} "
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)

super().__init__(
name=name,
entities=entities,
ttl=ttl,
batch_source=None,
stream_source=None,
features=features,
tags=tags,
online=online,
description=description,
owner=owner,
schema=schema,
source=source,
)
57 changes: 57 additions & 0 deletions sdk/python/feast/stream_feature_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from datetime import timedelta
from typing import Dict, List, Optional, Union

from google.protobuf.duration_pb2 import Duration

from feast import Entity, Feature, Field
achals marked this conversation as resolved.
Show resolved Hide resolved
from feast.data_source import DataSource
from feast.feature_view import FeatureView
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto

SUPPORTED_STREAM_SOURCES = {
"KafkaSource",
"KinesisSource",
}


class StreamFeatureView(FeatureView):
def __init__(
self,
*,
name: Optional[str] = None,
entities: Optional[Union[List[Entity], List[str]]] = None,
ttl: Optional[Union[Duration, timedelta]] = None,
features: Optional[List[Feature]] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = True,
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
):

if source is None:
raise ValueError("Feature views need a source specified")
if (
type(source).__name__ not in SUPPORTED_STREAM_SOURCES
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE
):
raise ValueError(
f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} "
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)

super().__init__(
name=name,
entities=entities,
ttl=ttl,
batch_source=None,
stream_source=None,
features=features,
tags=tags,
online=online,
description=description,
owner=owner,
schema=schema,
source=source,
)
70 changes: 70 additions & 0 deletions sdk/python/tests/unit/test_feature_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from datetime import timedelta

import pytest

from feast import KafkaSource
achals marked this conversation as resolved.
Show resolved Hide resolved
from feast.batch_feature_view import BatchFeatureView
from feast.data_format import AvroFormat
from feast.infra.offline_stores.file_source import FileSource
from feast.stream_feature_view import StreamFeatureView


def test_create_batch_feature_view():
batch_source = FileSource("some path")
achals marked this conversation as resolved.
Show resolved Hide resolved
BatchFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=batch_source,
)

with pytest.raises(ValueError):
BatchFeatureView(
name="test batch feature view", entities=[], ttl=timedelta(days=30)
)

stream_source = KafkaSource(
name="kafka",
event_timestamp_column="",
bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource("some path"),
)
with pytest.raises(ValueError):
BatchFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=stream_source,
)


def test_create_stream_feature_view():
stream_source = KafkaSource(
name="kafka",
event_timestamp_column="",
bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource("some path"),
)
StreamFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=stream_source,
)

with pytest.raises(ValueError):
StreamFeatureView(
name="test batch feature view", entities=[], ttl=timedelta(days=30)
)

with pytest.raises(ValueError):
StreamFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=FileSource("some path"),
)