Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CLI interface for validation of logged features #2718

Merged
merged 9 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ttl for regular feature views
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed May 20, 2022
commit acbfe844fe478cc20a2085db64b27b10332dc311
12 changes: 8 additions & 4 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,12 @@ func (s *OnlineFeatureService) StartGprcServerWithLogging(host string, port int,
go func() {
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
<-s.grpcStopCh
fmt.Println("Stopping the gRPC server...")
log.Println("Stopping the gRPC server...")
grpcServer.GracefulStop()
if loggingService != nil {
loggingService.Stop()
}
fmt.Println("gRPC server terminated")
log.Println("gRPC server terminated")
}()

err = grpcServer.Serve(lis)
Expand Down Expand Up @@ -314,11 +314,15 @@ func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int,
go func() {
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
<-s.httpStopCh
fmt.Println("Stopping the HTTP server...")
log.Println("Stopping the HTTP server...")
err := ser.Stop()
if err != nil {
fmt.Printf("Error when stopping the HTTP server: %v\n", err)
log.Printf("Error when stopping the HTTP server: %v\n", err)
}
if loggingService != nil {
loggingService.Stop()
}
log.Println("HTTP server terminated")
}()

return ser.Serve(host, port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ def server_port(environment, server_type: str):
embedded.stop_grpc_server()
else:
embedded.stop_http_server()

# wait for graceful stop
time.sleep(2)
time.sleep(5)


@pytest.fixture
Expand Down
13 changes: 8 additions & 5 deletions sdk/python/tests/utils/logged_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import pandas as pd
import pyarrow

from feast import FeatureService, FeatureStore
from feast import FeatureService, FeatureStore, FeatureView
from feast.errors import FeatureViewNotFoundException
from feast.feature_logging import LOG_DATE_FIELD, LOG_TIMESTAMP_FIELD, REQUEST_ID_FIELD
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus
from feast.utils import make_tzaware


def prepare_logs(
Expand Down Expand Up @@ -52,13 +51,17 @@ def prepare_logs(
logs_df[f"{destination_field}__timestamp"] = source_df[
"event_timestamp"
].dt.floor("s")
if logs_df[f"{destination_field}__timestamp"].dt.tz:
logs_df[f"{destination_field}__timestamp"] = logs_df[
f"{destination_field}__timestamp"
].dt.tz_convert(None)
logs_df[f"{destination_field}__status"] = FieldStatus.PRESENT
if view.ttl:
if isinstance(view, FeatureView) and view.ttl:
logs_df[f"{destination_field}__status"] = logs_df[
f"{destination_field}__status"
].mask(
source_df["event_timestamp"]
< (make_tzaware(datetime.datetime.utcnow()) - view.ttl),
logs_df[f"{destination_field}__timestamp"]
< (datetime.datetime.utcnow() - view.ttl),
FieldStatus.OUTSIDE_MAX_AGE,
)

Expand Down