Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CLI interface for validation of logged features #2718

Merged
merged 9 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lazy import & correct feature status in logs
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed May 19, 2022
commit cf622ea5852965a4054a0af3cf1d625e596af048
7 changes: 6 additions & 1 deletion sdk/python/feast/saved_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from google.protobuf.json_format import MessageToJson

from feast.data_source import DataSource
from feast.dqm.profilers.ge_profiler import GEProfile, GEProfiler
from feast.dqm.profilers.profiler import Profile, Profiler
from feast.protos.feast.core.SavedDataset_pb2 import SavedDataset as SavedDatasetProto
from feast.protos.feast.core.SavedDataset_pb2 import SavedDatasetMeta, SavedDatasetSpec
Expand Down Expand Up @@ -252,12 +251,16 @@ def profile(self) -> Profile:
def from_proto(cls, proto: ValidationReferenceProto) -> "ValidationReference":
profiler_attr = proto.WhichOneof("profiler")
if profiler_attr == "ge_profiler":
from feast.dqm.profilers.ge_profiler import GEProfiler

profiler = GEProfiler.from_proto(proto.ge_profiler)
else:
raise RuntimeError("Unrecognized profiler")

profile_attr = proto.WhichOneof("cached_profile")
if profile_attr == "ge_profile":
from feast.dqm.profilers.ge_profiler import GEProfile

profile = GEProfile.from_proto(proto.ge_profile)
elif not profile_attr:
profile = None
Expand All @@ -274,6 +277,8 @@ def from_proto(cls, proto: ValidationReferenceProto) -> "ValidationReference":
return ref

def to_proto(self) -> ValidationReferenceProto:
from feast.dqm.profilers.ge_profiler import GEProfile, GEProfiler

proto = ValidationReferenceProto(
name=self.name,
reference_dataset_name=self.dataset_name,
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/tests/integration/e2e/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
LoggingConfig,
)
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus
from feast.utils import make_tzaware
from feast.wait import wait_retry_backoff
from tests.integration.feature_repos.repo_configuration import (
construct_universal_feature_views,
Expand Down Expand Up @@ -334,7 +335,7 @@ def test_e2e_validation_via_cli(environment, universal_data_sources):
"current_balance": [0],
"avg_passenger_count": [0],
"lifetime_trip_count": [0],
"event_timestamp": [datetime.datetime.utcnow()],
"event_timestamp": [make_tzaware(datetime.datetime.utcnow())],
}
)
invalid_logs = prepare_logs(invalid_data, feature_service, store)
Expand Down
25 changes: 16 additions & 9 deletions sdk/python/tests/utils/logged_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from feast.errors import FeatureViewNotFoundException
from feast.feature_logging import LOG_DATE_FIELD, LOG_TIMESTAMP_FIELD, REQUEST_ID_FIELD
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus
from feast.utils import make_tzaware


def prepare_logs(
Expand All @@ -28,21 +29,27 @@ def prepare_logs(
logs_df[LOG_DATE_FIELD] = logs_df[LOG_TIMESTAMP_FIELD].dt.date

for projection in feature_service.feature_view_projections:
feature_view = store.get_feature_view(projection.name)
for feature in projection.features:
source_field = (
feature.name
if feature.name in source_df.columns
else f"{projection.name_to_use()}__{feature.name}"
)
logs_df[f"{projection.name_to_use()}__{feature.name}"] = source_df[
source_field
]
logs_df[
f"{projection.name_to_use()}__{feature.name}__timestamp"
] = source_df["event_timestamp"].dt.floor("s")
logs_df[
f"{projection.name_to_use()}__{feature.name}__status"
] = FieldStatus.PRESENT
destination_field = f"{projection.name_to_use()}__{feature.name}"
logs_df[destination_field] = source_df[source_field]
logs_df[f"{destination_field}__timestamp"] = source_df[
"event_timestamp"
].dt.floor("s")
logs_df[f"{destination_field}__status"] = FieldStatus.PRESENT
if feature_view.ttl:
logs_df[f"{destination_field}__status"] = logs_df[
f"{destination_field}__status"
].mask(
source_df["event_timestamp"]
< (make_tzaware(datetime.datetime.utcnow()) - feature_view.ttl),
FieldStatus.OUTSIDE_MAX_AGE,
)

try:
view = store.get_feature_view(projection.name)
Expand Down