Skip to content

Commit

Permalink
Add DatastoreTable infra object (feast-dev#2140)
Browse files Browse the repository at this point in the history
* Add DatastoreTable infra object

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Switch to StringValue

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Initialize Datastore client in __init__

Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 authored Dec 16, 2021
1 parent 107eddc commit 3cd9678
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 17 deletions.
39 changes: 39 additions & 0 deletions protos/feast/core/DatastoreTable.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// * Copyright 2021 The Feast Authors
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * https://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
//

syntax = "proto3";

package feast.core;
option java_package = "feast.proto.core";
option java_outer_classname = "DatastoreTableProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "google/protobuf/wrappers.proto";

// Represents a Datastore table
message DatastoreTable {
// Feast project of the table
string project = 1;

// Name of the table
string name = 2;

// GCP project id
google.protobuf.StringValue project_id = 3;

// Datastore namespace
google.protobuf.StringValue namespace = 4;
}
2 changes: 2 additions & 0 deletions protos/feast/core/InfraObject.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ option java_outer_classname = "InfraObjectProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "feast/core/DynamoDBTable.proto";
import "feast/core/DatastoreTable.proto";

// Represents a set of infrastructure objects managed by Feast
message Infra {
Expand All @@ -37,6 +38,7 @@ message InfraObject {
// The infrastructure object
oneof infra_object {
DynamoDBTable dynamodb_table = 2;
DatastoreTable datastore_table = 3;
CustomInfra custom_infra = 100;
}

Expand Down
119 changes: 102 additions & 17 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@

from feast import Entity, utils
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.DatastoreTable_pb2 import (
DatastoreTable as DatastoreTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -80,8 +85,6 @@ def update(
entities_to_keep: Sequence[Entity],
partial: bool,
):
"""
"""
online_config = config.online_store
assert isinstance(online_config, DatastoreOnlineStoreConfig)
client = self._get_client(online_config)
Expand Down Expand Up @@ -110,9 +113,6 @@ def teardown(
tables: Sequence[FeatureView],
entities: Sequence[Entity],
):
"""
There's currently no teardown done for Datastore.
"""
online_config = config.online_store
assert isinstance(online_config, DatastoreOnlineStoreConfig)
client = self._get_client(online_config)
Expand All @@ -128,18 +128,10 @@ def teardown(
client.delete(key)

def _get_client(self, online_config: DatastoreOnlineStoreConfig):

if not self._client:
try:
self._client = datastore.Client(
project=online_config.project_id, namespace=online_config.namespace,
)
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
"local Google Cloud account "
)
self._client = _initialize_client(
online_config.project_id, online_config.namespace
)
return self._client

@log_exceptions_and_usage(online_store="datastore")
Expand Down Expand Up @@ -267,7 +259,7 @@ def online_read(
return result


def _delete_all_values(client, key) -> None:
def _delete_all_values(client, key):
"""
Delete all data under the key path in datastore.
"""
Expand All @@ -279,3 +271,96 @@ def _delete_all_values(client, key) -> None:

for entity in entities:
client.delete(entity.key)


def _initialize_client(
project_id: Optional[str], namespace: Optional[str]
) -> datastore.Client:
try:
client = datastore.Client(project=project_id, namespace=namespace,)
return client
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
"local Google Cloud account "
)


class DatastoreTable(InfraObject):
"""
A Datastore table managed by Feast.
Attributes:
project: The Feast project of the table.
name: The name of the table.
project_id (optional): The GCP project id.
namespace (optional): Datastore namespace.
client: Datastore client.
"""

project: str
name: str
project_id: Optional[str]
namespace: Optional[str]
client: datastore.Client

def __init__(
self,
project: str,
name: str,
project_id: Optional[str] = None,
namespace: Optional[str] = None,
):
self.project = project
self.name = name
self.project_id = project_id
self.namespace = namespace
self.client = _initialize_client(self.project_id, self.namespace)

def to_proto(self) -> InfraObjectProto:
datastore_table_proto = DatastoreTableProto()
datastore_table_proto.project = self.project
datastore_table_proto.name = self.name
if self.project_id:
datastore_table_proto.project_id.FromString(bytes(self.project_id, "utf-8"))
if self.namespace:
datastore_table_proto.namespace.FromString(bytes(self.namespace, "utf-8"))

return InfraObjectProto(
infra_object_class_type="feast.infra.online_stores.datastore.DatastoreTable",
datastore_table=datastore_table_proto,
)

@staticmethod
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
datastore_table = DatastoreTable(
project=infra_object_proto.datastore_table.project,
name=infra_object_proto.datastore_table.name,
)

if infra_object_proto.datastore_table.HasField("project_id"):
datastore_table.project_id = (
infra_object_proto.datastore_table.project_id.SerializeToString()
).decode("utf-8")
if infra_object_proto.datastore_table.HasField("namespace"):
datastore_table.namespace = (
infra_object_proto.datastore_table.namespace.SerializeToString()
).decode("utf-8")

return datastore_table

def update(self):
key = self.client.key("Project", self.project, "Table", self.name)
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)
entity.update({"created_ts": datetime.utcnow()})
self.client.put(entity)

def teardown(self):
key = self.client.key("Project", self.project, "Table", self.name)
_delete_all_values(self.client, key)

# Delete the table metadata datastore entity
self.client.delete(key)

0 comments on commit 3cd9678

Please sign in to comment.