Skip to content

Commit

Permalink
Persisting results of historical retrieval (feast-dev#2197)
Browse files Browse the repository at this point in the history
* persisting results of historical retrieval

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix after rebase

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored Jan 26, 2022
1 parent d7707c1 commit 6f1174a
Show file tree
Hide file tree
Showing 25 changed files with 1,262 additions and 262 deletions.
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [Feature service](getting-started/concepts/feature-service.md)
* [Feature retrieval](getting-started/concepts/feature-retrieval.md)
* [Point-in-time joins](getting-started/concepts/point-in-time-joins.md)
* [Dataset](getting-started/concepts/dataset.md)
* [Architecture](getting-started/architecture-and-components/README.md)
* [Overview](getting-started/architecture-and-components/overview.md)
* [Feature repository](getting-started/architecture-and-components/feature-repository.md)
Expand Down
1 change: 1 addition & 0 deletions docs/getting-started/concepts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

{% page-ref page="point-in-time-joins.md" %}

{% page-ref page="dataset.md" %}
46 changes: 46 additions & 0 deletions docs/getting-started/concepts/dataset.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Dataset

Feast datasets allow for conveniently saving dataframes that include both features and entities to be subsequently used for data analysis and model training.
[Data Quality Monitoring](https://docs.google.com/document/d/110F72d4NTv80p35wDSONxhhPBqWRwbZXG4f9mNEMd98) was the primary motivation for creating dataset concept.

Dataset's metadata is stored in the Feast registry and raw data (features, entities, additional input keys and timestamp) is stored in the [offline store](../architecture-and-components/offline-store.md).

Dataset can be created from:
1. Results of historical retrieval
2. [planned] Logging request (including input for [on demand transformation](../../reference/alpha-on-demand-feature-view.md)) and response during feature serving
3. [planned] Logging features during writing to online store (from batch source or stream)


### Creating Saved Dataset from Historical Retrieval

To create a saved dataset from historical features for later retrieval or analysis, a user needs to call `get_historical_features` method first and then pass the returned retrieval job to `create_saved_dataset` method.
`create_saved_dataset` will trigger provided retrieval job (by calling `.persist()` on it) to store the data using specified `storage`.
Storage type must be the same as globally configured offline store (eg, it's impossible to persist data to Redshift with BigQuery source).
`create_saved_dataset` will also create SavedDataset object with all related metadata and will write it to the registry.

```python
from feast import FeatureStore
from feast.infra.offline_stores.bigquery_source import SavedDatasetBigQueryStorage

store = FeatureStore()

historical_job = store.get_historical_features(
features=["driver:avg_trip"],
entity_df=...,
)

dataset = store.create_saved_dataset(
from_=historical_job,
name='my_training_dataset',
storage=SavedDatasetBigQueryStorage(table_ref='<gcp-project>.<gcp-dataset>.my_training_dataset'),
tags={'author': 'oleksii'}
)

dataset.to_df()
```

Saved dataset can be later retrieved using `get_saved_dataset` method:
```python
dataset = store.get_saved_dataset('my_training_dataset')
dataset.to_df()
```
2 changes: 2 additions & 0 deletions protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import "feast/core/FeatureView.proto";
import "feast/core/InfraObject.proto";
import "feast/core/OnDemandFeatureView.proto";
import "feast/core/RequestFeatureView.proto";
import "feast/core/SavedDataset.proto";
import "google/protobuf/timestamp.proto";

message Registry {
Expand All @@ -37,6 +38,7 @@ message Registry {
repeated OnDemandFeatureView on_demand_feature_views = 8;
repeated RequestFeatureView request_feature_views = 9;
repeated FeatureService feature_services = 7;
repeated SavedDataset saved_datasets = 11;
Infra infra = 10;

string registry_schema_version = 3; // to support migrations; incremented when schema is changed
Expand Down
76 changes: 76 additions & 0 deletions protos/feast/core/SavedDataset.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//
// Copyright 2021 The Feast Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//


syntax = "proto3";

package feast.core;
option java_package = "feast.proto.core";
option java_outer_classname = "SavedDatasetProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "google/protobuf/timestamp.proto";
import "feast/core/FeatureViewProjection.proto";
import "feast/core/DataSource.proto";

message SavedDatasetSpec {
// Name of the dataset. Must be unique since it's possible to overwrite dataset by name
string name = 1;

// Name of Feast project that this Dataset belongs to.
string project = 2;

// list of feature references with format "<view name>:<feature name>"
repeated string features = 3;

// entity columns + request columns from all feature views used during retrieval
repeated string join_keys = 4;

// Whether full feature names are used in stored data
bool full_feature_names = 5;

SavedDatasetStorage storage = 6;

// User defined metadata
map<string, string> tags = 7;
}

message SavedDatasetStorage {
oneof kind {
DataSource.FileOptions file_storage = 4;
DataSource.BigQueryOptions bigquery_storage = 5;
DataSource.RedshiftOptions redshift_storage = 6;
}
}

message SavedDatasetMeta {
// Time when this saved dataset is created
google.protobuf.Timestamp created_timestamp = 1;

// Time when this saved dataset is last updated
google.protobuf.Timestamp last_updated_timestamp = 2;

// Min timestamp in the dataset (needed for retrieval)
google.protobuf.Timestamp min_event_timestamp = 3;

// Max timestamp in the dataset (needed for retrieval)
google.protobuf.Timestamp max_event_timestamp = 4;
}

message SavedDataset {
SavedDatasetSpec spec = 1;
SavedDatasetMeta meta = 2;
}
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ def __init__(self, bucket):
super().__init__(f"S3 bucket {bucket} for the Feast registry can't be accessed")


class SavedDatasetNotFound(FeastObjectNotFoundException):
def __init__(self, name: str, project: str):
super().__init__(f"Saved dataset {name} does not exist in project {project}")


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""

Expand Down
88 changes: 88 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
from feast.repo_config import RepoConfig, load_repo_config
from feast.repo_contents import RepoContents
from feast.request_feature_view import RequestFeatureView
from feast.saved_dataset import SavedDataset, SavedDatasetStorage
from feast.type_map import python_values_to_proto_values
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
from feast.value_type import ValueType
Expand Down Expand Up @@ -764,6 +765,93 @@ def get_historical_features(

return job

@log_exceptions_and_usage
def create_saved_dataset(
self,
from_: RetrievalJob,
name: str,
storage: SavedDatasetStorage,
tags: Optional[Dict[str, str]] = None,
) -> SavedDataset:
"""
Execute provided retrieval job and persist its outcome in given storage.
Storage type (eg, BigQuery or Redshift) must be the same as globally configured offline store.
After data successfully persisted saved dataset object with dataset metadata is committed to the registry.
Name for the saved dataset should be unique within project, since it's possible to overwrite previously stored dataset
with the same name.
Returns:
SavedDataset object with attached RetrievalJob
Raises:
ValueError if given retrieval job doesn't have metadata
"""
warnings.warn(
"Saving dataset is an experimental feature. "
"This API is unstable and it could and most probably will be changed in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
RuntimeWarning,
)

if not from_.metadata:
raise ValueError(
"RetrievalJob must contains metadata. "
"Use RetrievalJob produced by get_historical_features"
)

dataset = SavedDataset(
name=name,
features=from_.metadata.features,
join_keys=from_.metadata.keys,
full_feature_names=from_.full_feature_names,
storage=storage,
tags=tags,
)

dataset.min_event_timestamp = from_.metadata.min_event_timestamp
dataset.max_event_timestamp = from_.metadata.max_event_timestamp

from_.persist(storage)

self._registry.apply_saved_dataset(dataset, self.project, commit=True)

return dataset.with_retrieval_job(
self._get_provider().retrieve_saved_dataset(
config=self.config, dataset=dataset
)
)

@log_exceptions_and_usage
def get_saved_dataset(self, name: str) -> SavedDataset:
"""
Find a saved dataset in the registry by provided name and
create a retrieval job to pull whole dataset from storage (offline store).
If dataset couldn't be found by provided name SavedDatasetNotFound exception will be raised.
Data will be retrieved from globally configured offline store.
Returns:
SavedDataset with RetrievalJob attached
Raises:
SavedDatasetNotFound
"""
warnings.warn(
"Retrieving datasets is an experimental feature. "
"This API is unstable and it could and most probably will be changed in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
RuntimeWarning,
)

dataset = self._registry.get_saved_dataset(name, self.project)
provider = self._get_provider()

retrieval_job = provider.retrieve_saved_dataset(
config=self.config, dataset=dataset
)
return dataset.with_retrieval_job(retrieval_job)

@log_exceptions_and_usage
def materialize_incremental(
self, end_date: datetime, feature_views: Optional[List[str]] = None,
Expand Down
Loading

0 comments on commit 6f1174a

Please sign in to comment.