From 3cd967807dc29d513b5637cfb9baa52cb2eeba93 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 16 Dec 2021 13:04:18 -0800 Subject: [PATCH] Add DatastoreTable infra object (#2140) * Add DatastoreTable infra object Signed-off-by: Felix Wang * Switch to StringValue Signed-off-by: Felix Wang * Initialize Datastore client in __init__ Signed-off-by: Felix Wang --- protos/feast/core/DatastoreTable.proto | 39 ++++++ protos/feast/core/InfraObject.proto | 2 + .../feast/infra/online_stores/datastore.py | 119 +++++++++++++++--- 3 files changed, 143 insertions(+), 17 deletions(-) create mode 100644 protos/feast/core/DatastoreTable.proto diff --git a/protos/feast/core/DatastoreTable.proto b/protos/feast/core/DatastoreTable.proto new file mode 100644 index 0000000000..15720ad809 --- /dev/null +++ b/protos/feast/core/DatastoreTable.proto @@ -0,0 +1,39 @@ +// +// * Copyright 2021 The Feast Authors +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * https://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// + +syntax = "proto3"; + +package feast.core; +option java_package = "feast.proto.core"; +option java_outer_classname = "DatastoreTableProto"; +option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; + +import "google/protobuf/wrappers.proto"; + +// Represents a Datastore table +message DatastoreTable { + // Feast project of the table + string project = 1; + + // Name of the table + string name = 2; + + // GCP project id + google.protobuf.StringValue project_id = 3; + + // Datastore namespace + google.protobuf.StringValue namespace = 4; +} \ No newline at end of file diff --git a/protos/feast/core/InfraObject.proto b/protos/feast/core/InfraObject.proto index ded4c3ed68..a0f3541dec 100644 --- a/protos/feast/core/InfraObject.proto +++ b/protos/feast/core/InfraObject.proto @@ -22,6 +22,7 @@ option java_outer_classname = "InfraObjectProto"; option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; import "feast/core/DynamoDBTable.proto"; +import "feast/core/DatastoreTable.proto"; // Represents a set of infrastructure objects managed by Feast message Infra { @@ -37,6 +38,7 @@ message InfraObject { // The infrastructure object oneof infra_object { DynamoDBTable dynamodb_table = 2; + DatastoreTable datastore_table = 3; CustomInfra custom_infra = 100; } diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index a9bd534a50..0442eda122 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -21,8 +21,13 @@ from feast import Entity, utils from feast.feature_view import FeatureView +from feast.infra.infra_object import InfraObject from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.core.DatastoreTable_pb2 import ( + DatastoreTable as DatastoreTableProto, +) +from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -80,8 +85,6 @@ def update( entities_to_keep: Sequence[Entity], partial: bool, ): - """ - """ online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) client = self._get_client(online_config) @@ -110,9 +113,6 @@ def teardown( tables: Sequence[FeatureView], entities: Sequence[Entity], ): - """ - There's currently no teardown done for Datastore. - """ online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) client = self._get_client(online_config) @@ -128,18 +128,10 @@ def teardown( client.delete(key) def _get_client(self, online_config: DatastoreOnlineStoreConfig): - if not self._client: - try: - self._client = datastore.Client( - project=online_config.project_id, namespace=online_config.namespace, - ) - except DefaultCredentialsError as e: - raise FeastProviderLoginError( - str(e) - + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' - "local Google Cloud account " - ) + self._client = _initialize_client( + online_config.project_id, online_config.namespace + ) return self._client @log_exceptions_and_usage(online_store="datastore") @@ -267,7 +259,7 @@ def online_read( return result -def _delete_all_values(client, key) -> None: +def _delete_all_values(client, key): """ Delete all data under the key path in datastore. """ @@ -279,3 +271,96 @@ def _delete_all_values(client, key) -> None: for entity in entities: client.delete(entity.key) + + +def _initialize_client( + project_id: Optional[str], namespace: Optional[str] +) -> datastore.Client: + try: + client = datastore.Client(project=project_id, namespace=namespace,) + return client + except DefaultCredentialsError as e: + raise FeastProviderLoginError( + str(e) + + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' + "local Google Cloud account " + ) + + +class DatastoreTable(InfraObject): + """ + A Datastore table managed by Feast. + + Attributes: + project: The Feast project of the table. + name: The name of the table. + project_id (optional): The GCP project id. + namespace (optional): Datastore namespace. + client: Datastore client. + """ + + project: str + name: str + project_id: Optional[str] + namespace: Optional[str] + client: datastore.Client + + def __init__( + self, + project: str, + name: str, + project_id: Optional[str] = None, + namespace: Optional[str] = None, + ): + self.project = project + self.name = name + self.project_id = project_id + self.namespace = namespace + self.client = _initialize_client(self.project_id, self.namespace) + + def to_proto(self) -> InfraObjectProto: + datastore_table_proto = DatastoreTableProto() + datastore_table_proto.project = self.project + datastore_table_proto.name = self.name + if self.project_id: + datastore_table_proto.project_id.FromString(bytes(self.project_id, "utf-8")) + if self.namespace: + datastore_table_proto.namespace.FromString(bytes(self.namespace, "utf-8")) + + return InfraObjectProto( + infra_object_class_type="feast.infra.online_stores.datastore.DatastoreTable", + datastore_table=datastore_table_proto, + ) + + @staticmethod + def from_proto(infra_object_proto: InfraObjectProto) -> Any: + datastore_table = DatastoreTable( + project=infra_object_proto.datastore_table.project, + name=infra_object_proto.datastore_table.name, + ) + + if infra_object_proto.datastore_table.HasField("project_id"): + datastore_table.project_id = ( + infra_object_proto.datastore_table.project_id.SerializeToString() + ).decode("utf-8") + if infra_object_proto.datastore_table.HasField("namespace"): + datastore_table.namespace = ( + infra_object_proto.datastore_table.namespace.SerializeToString() + ).decode("utf-8") + + return datastore_table + + def update(self): + key = self.client.key("Project", self.project, "Table", self.name) + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) + entity.update({"created_ts": datetime.utcnow()}) + self.client.put(entity) + + def teardown(self): + key = self.client.key("Project", self.project, "Table", self.name) + _delete_all_values(self.client, key) + + # Delete the table metadata datastore entity + self.client.delete(key)