diff --git a/.gitignore b/.gitignore index afee5aa8ee..da912f6456 100644 --- a/.gitignore +++ b/.gitignore @@ -56,6 +56,7 @@ vendor __pycache__/ *.py[cod] *$py.class +*.prof # C extensions *.so diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index cdb8f4800b..662c86b1ae 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -22,7 +22,6 @@ GetFeatureSetsRequest, ApplyFeatureSetResponse, ) -from feast.types.Value_pb2 import ValueType from feast.serving.ServingService_pb2 import ( GetFeaturesRequest, GetFeastServingVersionRequest, @@ -30,18 +29,12 @@ ) from feast.feature_set import FeatureSet, Entity from feast.serving.ServingService_pb2_grpc import ServingServiceStub - from typing import List from collections import OrderedDict from typing import Dict -from datetime import datetime import os import pandas as pd -from feast.type_map import ( - pandas_value_to_proto_value, - dtype_to_feast_value_attr, - FEAST_VALUETYPE_TO_DTYPE, -) +from feast.type_map import pandas_value_to_proto_value, FEAST_VALUE_ATTR_TO_DTYPE GRPC_CONNECTION_TIMEOUT = 600 # type: int FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str @@ -206,7 +199,7 @@ def _apply_feature_set(self, feature_set: FeatureSet): "Error while trying to apply feature set " + feature_set.name ) applied_fs = FeatureSet.from_proto(apply_fs_response.feature_set) - feature_set.update_from_feature_set(applied_fs, is_dirty=False) + feature_set._update_from_feature_set(applied_fs, is_dirty=False) return def get( @@ -222,13 +215,11 @@ def get( entity_names = [] for column in entity_data.columns[1:]: - if column not in self.entities.keys(): - raise Exception("Entity " + column + " could not be found") entity_names.append(column) entity_dataset_rows = entity_data.apply( _convert_to_proto_value_fn(entity_data.dtypes), axis=1 - ).to_list() + ) feature_set_request = create_feature_set_request_from_feature_strings( feature_ids @@ -244,7 +235,6 @@ def get( ) # type: GetOnlineFeaturesResponse feature_dataframe = feature_data_sets_to_pandas_dataframe( - feature_sets=self._feature_sets, entity_data_set=entity_data.copy(), feature_data_sets=list(get_online_features_response_proto.feature_datasets), ) @@ -255,15 +245,15 @@ def _convert_to_proto_value_fn(dtypes: pd.core.generic.NDFrame): def convert_to_proto_value(row: pd.Series): entity_dataset_row = GetFeaturesRequest.EntityDatasetRow() for i in range(len(row) - 1): - proto_value = pandas_value_to_proto_value(dtypes[i + 1], row[i + 1]) - entity_dataset_row.value.append(proto_value) + entity_dataset_row.entity_ids.append( + pandas_value_to_proto_value(dtypes[i + 1], row[i + 1]) + ) return entity_dataset_row return convert_to_proto_value def feature_data_sets_to_pandas_dataframe( - feature_sets: List[FeatureSet], entity_data_set: pd.DataFrame, feature_data_sets: List[GetOnlineFeaturesResponse.FeatureDataset], ): @@ -280,9 +270,7 @@ def feature_data_sets_to_pandas_dataframe( # Convert to Pandas DataFrame feature_data_set_dataframes.append( - feature_data_set_to_pandas_dataframe( - feature_sets[feature_data_set.name], feature_data_set - ) + feature_data_set_to_pandas_dataframe(feature_data_set) ) # Join dataframes into a single feature dataframe @@ -301,29 +289,35 @@ def join_feature_set_dataframes( def feature_data_set_to_pandas_dataframe( - feature_set: FeatureSet, feature_data_set: GetOnlineFeaturesResponse.FeatureDataset + feature_data_set: GetOnlineFeaturesResponse.FeatureDataset ) -> pd.DataFrame: feature_set_name = feature_data_set.name dtypes = {} + value_attr = {} columns = [] - for field in feature_set.entities + feature_set.features: - field_proto = field.to_proto() - feature_id = feature_set_name + "." + field_proto.name - columns.append(feature_id) - feast_value_type = ValueType.Enum.Name(field_proto.value_type) - dtypes[feature_id] = FEAST_VALUETYPE_TO_DTYPE[feast_value_type] - - dataframe = pd.DataFrame(columns=columns).reset_index(drop=True).astype(dtypes) - - for featureRow in list(feature_data_set.feature_rows): - pandas_row = {} - for field in list(featureRow.fields): - if field.value.WhichOneof("val") is None: - feature_value = None + data = {} + first_run_done = False + + for featureRow in feature_data_set.feature_rows: + for field in featureRow.fields: + feature_id = feature_set_name + "." + field.name + + if not first_run_done: + columns.append(feature_id) + data[feature_id] = [] + value_attr[feature_id] = field.value.WhichOneof("val") + dtypes[feature_id] = FEAST_VALUE_ATTR_TO_DTYPE[value_attr[feature_id]] + + if not field.value.HasField(value_attr[feature_id]): + data[feature_id].append(None) else: - feature_value = getattr(field.value, field.value.WhichOneof("val")) - pandas_row[feature_set_name + "." + field.name] = feature_value - dataframe = dataframe.append(pandas_row, ignore_index=True) + data[feature_id].append(getattr(field.value, value_attr[feature_id])) + + first_run_done = True + + dataframe = ( + pd.DataFrame(columns=columns, data=data).reset_index(drop=True).astype(dtypes) + ) return dataframe @@ -337,7 +331,7 @@ def create_feature_set_request_from_feature_strings( if feature_set not in feature_set_request: feature_set_name, feature_set_version = feature_set.split(":") feature_set_request[feature_set] = GetFeaturesRequest.FeatureSet( - name=feature_set_name, version=feature_set_version + name=feature_set_name, version=int(feature_set_version) ) feature_set_request[feature_set].feature_names.append(feature) return list(feature_set_request.values()) diff --git a/sdk/python/feast/tests/feast_serving_server.py b/sdk/python/feast/tests/feast_serving_server.py index a303784922..7bb07be796 100644 --- a/sdk/python/feast/tests/feast_serving_server.py +++ b/sdk/python/feast/tests/feast_serving_server.py @@ -96,12 +96,6 @@ def GetFeastServingVersion(self, request, context): def GetOnlineFeatures(self, request: GetFeaturesRequest, context): - # for feature_set_request in list(request.featureSets): - # feature_data_set = self._store.get_feature_data( - # feature_set_request=feature_set_request, - # entity_data=request.entityDataSet, - # ) - response = GetOnlineFeaturesResponse( feature_data_sets=[ GetOnlineFeaturesResponse.FeatureDataSet( diff --git a/sdk/python/feast/tests/test_client.py b/sdk/python/feast/tests/test_client.py index 6dbaf18adb..b68488b9c6 100644 --- a/sdk/python/feast/tests/test_client.py +++ b/sdk/python/feast/tests/test_client.py @@ -11,28 +11,26 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import time import grpc import pandas as pd +import numpy as np import feast.core.CoreService_pb2_grpc as Core import feast.serving.ServingService_pb2_grpc as Serving -from feast.entity import Entity from feast.core.CoreService_pb2 import GetFeastCoreVersionResponse -from feast.core.CoreService_pb2 import GetFeatureSetsResponse -from feast.core.FeatureSet_pb2 import FeatureSetSpec -from feast.serving.ServingService_pb2 import GetFeastServingVersionResponse +from feast.serving.ServingService_pb2 import ( + GetFeastServingVersionResponse, + GetOnlineFeaturesResponse, +) +from google.protobuf.timestamp_pb2 import Timestamp import pytest from feast.client import Client -from feast.tests import dataframes from concurrent import futures from feast.tests.feast_core_server import CoreServicer from feast.tests.feast_serving_server import ServingServicer -from feast.feature import Feature -from feast.feature_set import FeatureSet -from feast.value_type import ValueType -from feast.source import KafkaSource -from feast.tests.fake_kafka import FakeKafka +from feast.types import FeatureRow_pb2 as FeatureRowProto +from feast.types import Field_pb2 as FieldProto +from feast.types import Value_pb2 as ValueProto CORE_URL = "core.feast.example.com" SERVING_URL = "serving.example.com" @@ -94,63 +92,56 @@ def test_version(self, mock_client, mocker): and status["serving"]["version"] == "0.3.0" ) - @pytest.mark.parametrize("dataframe", [dataframes.GOOD]) - def test_ingest_then_get_one_feature_set_success( - self, core_server, dataframe: pd.DataFrame - ): - # Create and register Fake Kafka - fake_kafka = FakeKafka() + def test_get_feature(self, mock_client, mocker): + ROW_COUNT = 300 - # Set up Feast Serving with Fake Kafka - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - Serving.add_ServingServiceServicer_to_server( - ServingServicer(kafka=fake_kafka), server + mock_client._serving_service_stub = Serving.ServingServiceStub( + grpc.insecure_channel("") ) - server.add_insecure_port("[::]:50052") - server.start() - # Create Feast client and register with Core and Serving - client = Client(core_url="localhost:50051", serving_url="localhost:50052") - - # Create feature set and update based on dataframe - feature_set_1 = FeatureSet( - name="feature_set_1", - entities=[Entity(name="entity_id", dtype=ValueType.INT32)], - features=[ - Feature(name="feature_1", dtype=ValueType.FLOAT), - Feature(name="feature_2", dtype=ValueType.STRING), - Feature(name="feature_3", dtype=ValueType.INT32), - ], + response = GetOnlineFeaturesResponse() + feature_row = FeatureRowProto.FeatureRow( + event_timestamp=Timestamp(), feature_set="feature_set_1:1" + ) + for feature_num in range(1, 10): + field = FieldProto.Field( + name="feature_" + str(feature_num), + value=ValueProto.Value(int64_val=feature_num), + ) + feature_row.fields.append(field) + + feature_data_set = GetOnlineFeaturesResponse.FeatureDataset( + name="feature_set_1", version=1 ) - # Register feature set with Feast core - client.apply(feature_set_1) + for row_number in range(1, ROW_COUNT + 1): + feature_data_set.feature_rows.append(feature_row) - # Register Fake Kafka with feature set - feature_set_1._message_producer = fake_kafka + response.feature_datasets.append(feature_data_set) - # Ingest data into Feast using Fake Kafka - feature_set_1.ingest(dataframe) + mocker.patch.object( + mock_client._serving_service_stub, + "GetOnlineFeatures", + return_value=response, + ) - time.sleep(2) + entity_data = pd.DataFrame( + {"datetime": np.repeat(4, ROW_COUNT), "entity_id": np.repeat(4, ROW_COUNT)} + ) - # Retrieve feature values from Feast serving - feature_dataframe = client.get( - entity_data=dataframe[["datetime", "entity_id"]], + feature_dataframe = mock_client.get( + entity_data=entity_data, feature_ids=[ - "feature_set_1.feature_1", - "feature_set_1.feature_2", - "feature_set_1.feature_3", + "feature_set_1:1.feature_1", + "feature_set_1:1.feature_2", + "feature_set_1:1.feature_3", + "feature_set_1:1.feature_4", + "feature_set_1:1.feature_5", + "feature_set_1:1.feature_6", + "feature_set_1:1.feature_7", + "feature_set_1:1.feature_8", + "feature_set_1:1.feature_9", ], ) - assert True - - # assert ( - # feature_dataframe["feature_set_1.feature_1"][0] == 0.2 - # and feature_dataframe["feature_set_1.feature_2"][0] == "string1" - # and feature_dataframe["feature_set_1.feature_3"][0] == 1 - # ) - - # Stop Feast Serving server - server.stop(0) + feature_dataframe.head() diff --git a/sdk/python/feast/tests/test_feature_set.py b/sdk/python/feast/tests/test_feature_set.py index b8062da4d4..fe43fa3648 100644 --- a/sdk/python/feast/tests/test_feature_set.py +++ b/sdk/python/feast/tests/test_feature_set.py @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime - -import pytz from unittest.mock import MagicMock from feast.feature_set import FeatureSet, Feature from feast.value_type import ValueType @@ -109,7 +106,7 @@ def test_feature_set_ingest_success(self, dataframe, client): dataframe, column_mapping={"entity_id": Entity(name="entity", dtype=ValueType.INT64)}, ) - driver_fs.source = KafkaSource(topic="feature-topic", brokers="fake.broker.com") + driver_fs.source = KafkaSource(topic="feature-topic", brokers="127.0.0.1") driver_fs._message_producer = MagicMock() driver_fs._message_producer.send = MagicMock() diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index d2339ed75f..0854c680f5 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -21,7 +21,7 @@ # Mapping of feast value type to Pandas DataFrame dtypes # Integer and floating values are all 64-bit for better integration # with BigQuery data types -FEAST_VALUETYPE_TO_DTYPE = { +FEAST_VALUE_TYPE_TO_DTYPE = { "BYTES": np.byte, "STRING": np.object, "INT32": "Int32", # Use pandas nullable int type @@ -31,6 +31,16 @@ "BOOL": np.bool, } +FEAST_VALUE_ATTR_TO_DTYPE = { + "bytes_val": np.byte, + "string_val": np.object, + "int32_val": "Int32", + "int64_val": "Int64", + "double_val": np.float64, + "float_val": np.float64, + "bool_val": np.bool, +} + def dtype_to_feast_value_attr(dtype): # Mapping of Pandas dtype to attribute name in Feast Value @@ -104,7 +114,7 @@ def pandas_dtype_to_feast_value_type(dtype: pd.DataFrame.dtypes) -> ValueType: def pandas_value_to_proto_value(pandas_dtype, pandas_value) -> ProtoValue: value = ProtoValue() value_attr = dtype_to_feast_value_attr(pandas_dtype) - if pandas_dtype.__str__() in ["datetime64[ns]", "datetime64[ns, UTC]"]: + if pandas_dtype.__str__() in ["datetime64[ns]", "datetime64[ns, UTC]"]: pandas_value = int(pandas_value.timestamp()) try: value.__setattr__(value_attr, pandas_value)