Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the foundation of the universal feature repo and a test that uses it #1734

Merged
merged 20 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add the foundation of the universal feature repo and a test that uses it
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Aug 2, 2021
commit 2c3a7c229f6c50943f876622f7e3d0afe597bf5b
27 changes: 27 additions & 0 deletions sdk/python/tests/data/data_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from datetime import datetime, timedelta

import pandas as pd
from pytz import timezone, utc


def create_dataset() -> pd.DataFrame:
now = datetime.utcnow()
ts = pd.Timestamp(now).round("ms")
data = {
"id": [1, 2, 1, 3, 3],
"value": [0.1, None, 0.3, 4, 5],
"ts_1": [
ts - timedelta(hours=4),
ts,
ts - timedelta(hours=3),
# Use different time zones to test tz-naive -> tz-aware conversion
(ts - timedelta(hours=4))
.replace(tzinfo=utc)
.astimezone(tz=timezone("Europe/Berlin")),
(ts - timedelta(hours=1))
.replace(tzinfo=utc)
.astimezone(tz=timezone("US/Pacific")),
],
"created_ts": [ts, ts, ts, ts, ts],
}
return pd.DataFrame.from_dict(data)
achals marked this conversation as resolved.
Show resolved Hide resolved
134 changes: 134 additions & 0 deletions sdk/python/tests/integration/e2e/test_universal_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import math
from datetime import datetime, timedelta
from typing import Optional

import pandas as pd
import pytest
from pytz import utc

from feast import FeatureStore, FeatureView
from tests.integration.feature_repos.universal.data_sources.bigquery import (
BigQueryDataSourceCreator,
)
from tests.integration.feature_repos.test_repo_configuration import construct_feature_store

parameters = [
("bigquery", BigQueryDataSourceCreator, "datastore"),
]


@pytest.mark.parametrize("params", parameters)
@pytest.mark.skip(reason="Still working on this test")
def test_e2e_consistency(params):
with construct_feature_store(params) as fs:
achals marked this conversation as resolved.
Show resolved Hide resolved
fv = fs.get_feature_view("test_correctness")
run_offline_online_store_consistency_test(fs, fv, True)


def check_offline_and_online_features(
fs: FeatureStore,
fv: FeatureView,
driver_id: int,
event_timestamp: datetime,
expected_value: Optional[float],
full_feature_names: bool,
check_offline_store: bool = True,
) -> None:
# Check online store
response_dict = fs.get_online_features(
[f"{fv.name}:value"],
[{"driver": driver_id}],
full_feature_names=full_feature_names,
).to_dict()

if full_feature_names:
if expected_value:
assert abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6
else:
assert response_dict[f"{fv.name}__value"][0] is None
else:
if expected_value:
assert abs(response_dict["value"][0] - expected_value) < 1e-6
else:
assert response_dict["value"][0] is None

# Check offline store
if check_offline_store:
df = fs.get_historical_features(
entity_df=pd.DataFrame.from_dict(
{"driver_id": [driver_id], "event_timestamp": [event_timestamp]}
),
features=[f"{fv.name}:value"],
full_feature_names=full_feature_names,
).to_df()

if full_feature_names:
if expected_value:
assert abs(df.to_dict()[f"{fv.name}__value"][0] - expected_value) < 1e-6
else:
assert math.isnan(df.to_dict()[f"{fv.name}__value"][0])
else:
if expected_value:
assert abs(df.to_dict()["value"][0] - expected_value) < 1e-6
else:
assert math.isnan(df.to_dict()["value"][0])


def run_offline_online_store_consistency_test(
fs: FeatureStore,
fv: FeatureView,
full_feature_names: bool,
check_offline_store: bool = True,
) -> None:
now = datetime.utcnow()
# Run materialize()
# use both tz-naive & tz-aware timestamps to test that they're both correctly handled
start_date = (now - timedelta(hours=5)).replace(tzinfo=utc)
end_date = now - timedelta(hours=2)
fs.materialize(feature_views=[fv.name], start_date=start_date, end_date=end_date)

# check result of materialize()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=1,
event_timestamp=end_date,
expected_value=0.3,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)

check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=2,
event_timestamp=end_date,
expected_value=None,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)

# check prior value for materialize_incremental()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=3,
event_timestamp=end_date,
expected_value=4,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)

# run materialize_incremental()
fs.materialize_incremental(feature_views=[fv.name], end_date=now)

# check result of materialize_incremental()
check_offline_and_online_features(
fs=fs,
fv=fv,
driver_id=3,
event_timestamp=now,
expected_value=5,
full_feature_names=full_feature_names,
check_offline_store=check_offline_store,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import tempfile
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import Optional

from feast import FeatureStore, RepoConfig
from feast.infra.offline_stores.bigquery import BigQueryOfflineStore
from feast.infra.online_stores.datastore import DatastoreOnlineStoreConfig
from tests.data.data_creator import create_dataset
from tests.integration.feature_repos.universal.data_sources.bigquery import (
BigQueryDataSourceCreator,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.feature_views import (
correctness_feature_view,
)


class TestRepoConfig:
"""
This class should hold all possible parameters that may need to be varied by individual tests.
"""

provider: str = "local"
offline_store: str = "file"
online_store: str = "sqlite"

full_feature_names: bool = True


@contextmanager
def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
"""
This method should take in the parameters from the test repo config and created a feature repo, apply it,
and return the constructed feature store object to callers.

This feature store object can be interacted for the purposes of tests.
The user is *not* expected to perform any clean up actions.

:param test_repo_config: configuration
:return: A feature store built using the supplied configuration.
"""
df = create_dataset()

project = f"test_correctness_{str(uuid.uuid4()).replace('-', '')}"

# TODO: Parameterize over data sources, by pulling this into individual data_source classes behind
# TODO: an appropriate interface.
ds = BigQueryDataSourceCreator().create_data_source(project, df)
offline_store = BigQueryOfflineStore()
online_store = DatastoreOnlineStoreConfig(namespace="integration_test")

with tempfile.TemporaryDirectory() as repo_dir_name:
config = RepoConfig(
registry=str(Path(repo_dir_name) / "registry.db"),
project=project,
provider=test_repo_config.provider,
offline_store=offline_store,
online_store=online_store,
)
fs = FeatureStore(config=config)
fv = correctness_feature_view(ds)
entity = driver
fs.apply([fv, entity])
achals marked this conversation as resolved.
Show resolved Hide resolved
yield fs
achals marked this conversation as resolved.
Show resolved Hide resolved

fs.teardown()
BigQueryDataSourceCreator().teardown(project)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from abc import ABC, abstractmethod

import pandas as pd

from feast.data_source import DataSource


class DataSourceCreator(ABC):
@abstractmethod
def create_data_source(
self,
name: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
**kwargs,
) -> DataSource:
...

@abstractmethod
def teardown(self, name: str):
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import time

import pandas as pd
from google.cloud import bigquery

from feast import BigQuerySource
from feast.data_source import DataSource
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)


class BigQueryDataSourceCreator(DataSourceCreator):
def teardown(self, name: str):
pass

def __init__(self):
self.client = bigquery.Client()

def create_data_source(
self,
name: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
**kwargs,
) -> DataSource:
gcp_project = self.client.project
bigquery_dataset = "test_ingestion"
dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}")
self.client.create_dataset(dataset, exists_ok=True)
dataset.default_table_expiration_ms = (
achals marked this conversation as resolved.
Show resolved Hide resolved
1000 * 60 * 60 * 24 * 14
) # 2 weeks in milliseconds
self.client.update_dataset(dataset, ["default_table_expiration_ms"])

job_config = bigquery.LoadJobConfig()
table_ref = f"{gcp_project}.{bigquery_dataset}.{name}_{int(time.time_ns())}"
job = self.client.load_table_from_dataframe(
df, table_ref, job_config=job_config
)
job.result()

return BigQuerySource(
table_ref=table_ref,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
date_partition_column="",
field_mapping={"ts_1": "ts", "id": "driver_id"},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from feast import Entity, ValueType

driver = Entity(
achals marked this conversation as resolved.
Show resolved Hide resolved
name="driver", # The name is derived from this argument, not object name.
value_type=ValueType.INT64,
description="driver id",
join_key="driver_id",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from datetime import timedelta

from feast import Feature, FeatureView, ValueType
from feast.data_source import DataSource


def correctness_feature_view(data_source: DataSource) -> FeatureView:
return FeatureView(
name="test_correctness",
entities=["driver"],
features=[Feature("value", ValueType.FLOAT)],
ttl=timedelta(days=5),
input=data_source,
)
Loading