Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CLI interface for validation of logged features #2718

Merged
merged 9 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,12 @@ func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int,
go func() {
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
<-s.grpcStopCh
fmt.Println("Stopping the gRPC server...")
log.Println("Stopping the gRPC server...")
grpcServer.GracefulStop()
if loggingService != nil {
loggingService.Stop()
}
fmt.Println("gRPC server terminated")
log.Println("gRPC server terminated")
}()

err = grpcServer.Serve(lis)
Expand Down Expand Up @@ -314,11 +314,15 @@ func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int,
go func() {
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
<-s.httpStopCh
fmt.Println("Stopping the HTTP server...")
log.Println("Stopping the HTTP server...")
err := ser.Stop()
if err != nil {
fmt.Printf("Error when stopping the HTTP server: %v\n", err)
log.Printf("Error when stopping the HTTP server: %v\n", err)
}
if loggingService != nil {
loggingService.Stop()
}
log.Println("HTTP server terminated")
}()

return ser.Serve(host, port)
Expand Down
1 change: 0 additions & 1 deletion protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ message FeatureServiceMeta {

message LoggingConfig {
float sample_rate = 1;
google.protobuf.Duration partition_interval = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to reserve #2 or does it it not matter since it's never been used by anyone?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's never been used.


oneof destination {
FileDestination file_destination = 3;
Expand Down
2 changes: 2 additions & 0 deletions protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import "feast/core/OnDemandFeatureView.proto";
import "feast/core/RequestFeatureView.proto";
import "feast/core/DataSource.proto";
import "feast/core/SavedDataset.proto";
import "feast/core/ValidationProfile.proto";
import "google/protobuf/timestamp.proto";

// Next id: 13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update 13 -> 14

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Expand All @@ -42,6 +43,7 @@ message Registry {
repeated RequestFeatureView request_feature_views = 9;
repeated FeatureService feature_services = 7;
repeated SavedDataset saved_datasets = 11;
repeated ValidationReference validation_references = 13;
Infra infra = 10;

string registry_schema_version = 3; // to support migrations; incremented when schema is changed
Expand Down
17 changes: 14 additions & 3 deletions protos/feast/core/ValidationProfile.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,20 @@ message GEValidationProfile {
}

message ValidationReference {
SavedDataset dataset = 1;

// Unique name of validation reference within the project
string name = 1;
// Name of saved dataset used as reference dataset
string reference_dataset_name = 2;
// Name of Feast project that this object source belongs to
string project = 3;

// validation profiler
oneof profiler {
GEValidationProfiler ge_profiler = 2;
GEValidationProfiler ge_profiler = 4;
}

// (optional) cached validation profile (to avoid constant recalculation)
oneof cached_profile {
GEValidationProfile ge_profile = 5;
}
}
59 changes: 58 additions & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import warnings
from datetime import datetime
Expand All @@ -23,6 +23,7 @@
import yaml
from colorama import Fore, Style
from dateutil import parser
from pygments import formatters, highlight, lexers

from feast import flags, flags_helper, utils
from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT
Expand Down Expand Up @@ -758,5 +759,61 @@ def disable_alpha_features(ctx: click.Context):
store.config.write_to_path(Path(repo_path))


@cli.command("validate")
@click.option(
"--feature-service", "-f", help="Specify a feature service name",
)
@click.option(
"--reference", "-r", help="Specify a validation reference name",
)
@click.option(
"--no-profile-cache", is_flag=True, help="Do not store cached profile in registry",
)
@click.argument("start_ts")
@click.argument("end_ts")
@click.pass_context
def validate(
ctx: click.Context,
feature_service: str,
reference: str,
start_ts: str,
end_ts: str,
no_profile_cache,
):
"""
Perform validation of logged features (produced by a given feature service) against provided reference.

START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))

feature_service = store.get_feature_service(name=feature_service)
reference = store.get_validation_reference(reference)

result = store.validate_logged_features(
source=feature_service,
reference=reference,
start=datetime.fromisoformat(start_ts),
end=datetime.fromisoformat(end_ts),
throw_exception=False,
cache_profile=not no_profile_cache,
)

if not result:
print(f"{Style.BRIGHT + Fore.GREEN}Validation successful!{Style.RESET_ALL}")
return

errors = [e.to_dict() for e in result.report.errors]
formatted_json = json.dumps(errors, indent=4)
colorful_json = highlight(
formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter()
)
print(f"{Style.BRIGHT + Fore.RED}Validation failed!{Style.RESET_ALL}")
print(colorful_json)
exit(1)


if __name__ == "__main__":
cli()
10 changes: 7 additions & 3 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)
from feast.protos.feast.core.ValidationProfile_pb2 import (
ValidationReference as ValidationReferenceProto,
)
from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry
from feast.repo_contents import RepoContents

Expand Down Expand Up @@ -103,6 +106,7 @@ def tag_objects_for_keep_delete_update_add(
FeatureServiceProto,
OnDemandFeatureViewProto,
RequestFeatureViewProto,
ValidationReferenceProto,
)


Expand All @@ -120,9 +124,9 @@ def diff_registry_objects(

current_spec: FeastObjectSpecProto
new_spec: FeastObjectSpecProto
if isinstance(current_proto, DataSourceProto) or isinstance(
new_proto, DataSourceProto
):
if isinstance(
current_proto, (DataSourceProto, ValidationReferenceProto)
) or isinstance(new_proto, (DataSourceProto, ValidationReferenceProto)):
assert type(current_proto) == type(new_proto)
current_spec = cast(DataSourceProto, current_proto)
new_spec = cast(DataSourceProto, new_proto)
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/dqm/profilers/ge_profiler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from types import FunctionType
from typing import Any, Callable, Dict, List

import dill
Expand Down Expand Up @@ -140,9 +141,12 @@ def analyze_dataset(self, df: pd.DataFrame) -> Profile:
return GEProfile(expectation_suite=self.user_defined_profiler(dataset))

def to_proto(self):
# keep only the code and drop context for now
# ToDo (pyalex): include some context, but not all (dill tries to pull too much)
Comment on lines +144 to +145
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what this means?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So right now for easier deserialization I drop the whole context of user defined function and keep only its code. The opposite of that (what dill does by default) is include all globals and refer the original module, which won't be available at the runtime. But maybe in future somebody would want to use at least function clojure, so we might want to support that.

udp = FunctionType(self.user_defined_profiler.__code__, {})
return GEValidationProfilerProto(
profiler=GEValidationProfilerProto.UserDefinedProfiler(
body=dill.dumps(self.user_defined_profiler, recurse=True)
body=dill.dumps(udp, recurse=False)
)
)

Expand Down
13 changes: 13 additions & 0 deletions sdk/python/feast/dqm/profilers/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class ValidationError:

missing_count: Optional[int]
missing_percent: Optional[float]
observed_value: Optional[float]

def __init__(
self,
Expand All @@ -77,12 +78,24 @@ def __init__(
check_config: Optional[Any] = None,
missing_count: Optional[int] = None,
missing_percent: Optional[float] = None,
observed_value: Optional[float] = None,
):
self.check_name = check_name
self.column_name = column_name
self.check_config = check_config
self.missing_count = missing_count
self.missing_percent = missing_percent
self.observed_value = observed_value

def __repr__(self):
return f"<ValidationError {self.check_name}:{self.column_name}>"

def to_dict(self):
return dict(
check_name=self.check_name,
column_name=self.column_name,
check_config=self.check_config,
missing_count=self.missing_count,
missing_percent=self.missing_percent,
observed_value=self.observed_value,
)
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ def __init__(self, name: str, project: str):
super().__init__(f"Saved dataset {name} does not exist in project {project}")


class ValidationReferenceNotFound(FeastObjectNotFoundException):
def __init__(self, name: str, project: str):
super().__init__(
f"Validation reference {name} does not exist in project {project}"
)


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""

Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/feast_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from .protos.feast.core.FeatureView_pb2 import FeatureViewSpec
from .protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec
from .protos.feast.core.RequestFeatureView_pb2 import RequestFeatureViewSpec
from .protos.feast.core.ValidationProfile_pb2 import (
ValidationReference as ValidationReferenceProto,
)
from .request_feature_view import RequestFeatureView
from .saved_dataset import ValidationReference

# Convenience type representing all Feast objects
FeastObject = Union[
Expand All @@ -21,6 +25,7 @@
Entity,
FeatureService,
DataSource,
ValidationReference,
]

FeastObjectSpecProto = Union[
Expand All @@ -30,4 +35,5 @@
EntitySpecV2,
FeatureServiceSpec,
DataSourceProto,
ValidationReferenceProto,
]
38 changes: 38 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ def apply(
OnDemandFeatureView,
RequestFeatureView,
FeatureService,
ValidationReference,
List[FeastObject],
],
objects_to_delete: Optional[List[FeastObject]] = None,
Expand Down Expand Up @@ -669,6 +670,9 @@ def apply(
data_sources_set_to_update = {
ob for ob in objects if isinstance(ob, DataSource)
}
validation_references_to_update = [
ob for ob in objects if isinstance(ob, ValidationReference)
]

for fv in views_to_update:
data_sources_set_to_update.add(fv.batch_source)
Expand Down Expand Up @@ -719,6 +723,10 @@ def apply(
self._registry.apply_feature_service(
feature_service, project=self.project, commit=False
)
for validation_references in validation_references_to_update:
self._registry.apply_validation_reference(
validation_references, project=self.project, commit=False
)

if not partial:
# Delete all registry objects that should not exist.
Expand All @@ -740,6 +748,9 @@ def apply(
data_sources_to_delete = [
ob for ob in objects_to_delete if isinstance(ob, DataSource)
]
validation_references_to_delete = [
ob for ob in objects_to_delete if isinstance(ob, ValidationReference)
]

for data_source in data_sources_to_delete:
self._registry.delete_data_source(
Expand All @@ -765,6 +776,10 @@ def apply(
self._registry.delete_feature_service(
service.name, project=self.project, commit=False
)
for validation_references in validation_references_to_delete:
self._registry.delete_validation_reference(
validation_references.name, project=self.project, commit=False
)

self._get_provider().update_infra(
project=self.project,
Expand Down Expand Up @@ -2039,6 +2054,7 @@ def serve_transformations(self, port: int) -> None:
def _teardown_go_server(self):
self._go_server = None

@log_exceptions_and_usage
def write_logged_features(
self, logs: Union[pa.Table, Path], source: Union[FeatureService]
):
Expand Down Expand Up @@ -2066,13 +2082,15 @@ def write_logged_features(
registry=self._registry,
)

@log_exceptions_and_usage
def validate_logged_features(
self,
source: Union[FeatureService],
start: datetime,
end: datetime,
reference: ValidationReference,
throw_exception: bool = True,
cache_profile: bool = True,
) -> Optional[ValidationFailed]:
"""
Load logged features from an offline store and validate them against provided validation reference.
Expand All @@ -2083,6 +2101,7 @@ def validate_logged_features(
end: upper bound for loading logged features
reference: validation reference
throw_exception: throw exception or return it as a result
cache_profile: store cached profile in Feast registry

Returns:
Throw or return (depends on parameter) ValidationFailed exception if validation was not successful
Expand Down Expand Up @@ -2116,8 +2135,27 @@ def validate_logged_features(

return exc

if cache_profile:
self.apply(reference)

return None

@log_exceptions_and_usage
def get_validation_reference(
self, name: str, allow_cache: bool = False
) -> ValidationReference:
"""
Retrieves a validation reference.

Raises:
ValidationReferenceNotFoundException: The validation reference could not be found.
"""
ref = self._registry.get_validation_reference(
name, project=self.project, allow_cache=allow_cache
)
ref._dataset = self.get_saved_dataset(ref.dataset_name)
return ref


def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
set_of_row_lengths = {len(v) for v in join_key_values.values()}
Expand Down
Loading