Skip to content

Commit

Permalink
Optimize retrieval performance
Browse files Browse the repository at this point in the history
  • Loading branch information
woop committed Sep 26, 2019
1 parent 5f1cd69 commit bfc3403
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 108 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ vendor
__pycache__/
*.py[cod]
*$py.class
*.prof

# C extensions
*.so
Expand Down
70 changes: 32 additions & 38 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,19 @@
GetFeatureSetsRequest,
ApplyFeatureSetResponse,
)
from feast.types.Value_pb2 import ValueType
from feast.serving.ServingService_pb2 import (
GetFeaturesRequest,
GetFeastServingVersionRequest,
GetOnlineFeaturesResponse,
)
from feast.feature_set import FeatureSet, Entity
from feast.serving.ServingService_pb2_grpc import ServingServiceStub

from typing import List
from collections import OrderedDict
from typing import Dict
from datetime import datetime
import os
import pandas as pd
from feast.type_map import (
pandas_value_to_proto_value,
dtype_to_feast_value_attr,
FEAST_VALUETYPE_TO_DTYPE,
)
from feast.type_map import pandas_value_to_proto_value, FEAST_VALUE_ATTR_TO_DTYPE

GRPC_CONNECTION_TIMEOUT = 600 # type: int
FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str
Expand Down Expand Up @@ -206,7 +199,7 @@ def _apply_feature_set(self, feature_set: FeatureSet):
"Error while trying to apply feature set " + feature_set.name
)
applied_fs = FeatureSet.from_proto(apply_fs_response.feature_set)
feature_set.update_from_feature_set(applied_fs, is_dirty=False)
feature_set._update_from_feature_set(applied_fs, is_dirty=False)
return

def get(
Expand All @@ -222,13 +215,11 @@ def get(

entity_names = []
for column in entity_data.columns[1:]:
if column not in self.entities.keys():
raise Exception("Entity " + column + " could not be found")
entity_names.append(column)

entity_dataset_rows = entity_data.apply(
_convert_to_proto_value_fn(entity_data.dtypes), axis=1
).to_list()
)

feature_set_request = create_feature_set_request_from_feature_strings(
feature_ids
Expand All @@ -244,7 +235,6 @@ def get(
) # type: GetOnlineFeaturesResponse

feature_dataframe = feature_data_sets_to_pandas_dataframe(
feature_sets=self._feature_sets,
entity_data_set=entity_data.copy(),
feature_data_sets=list(get_online_features_response_proto.feature_datasets),
)
Expand All @@ -255,15 +245,15 @@ def _convert_to_proto_value_fn(dtypes: pd.core.generic.NDFrame):
def convert_to_proto_value(row: pd.Series):
entity_dataset_row = GetFeaturesRequest.EntityDatasetRow()
for i in range(len(row) - 1):
proto_value = pandas_value_to_proto_value(dtypes[i + 1], row[i + 1])
entity_dataset_row.value.append(proto_value)
entity_dataset_row.entity_ids.append(
pandas_value_to_proto_value(dtypes[i + 1], row[i + 1])
)
return entity_dataset_row

return convert_to_proto_value


def feature_data_sets_to_pandas_dataframe(
feature_sets: List[FeatureSet],
entity_data_set: pd.DataFrame,
feature_data_sets: List[GetOnlineFeaturesResponse.FeatureDataset],
):
Expand All @@ -280,9 +270,7 @@ def feature_data_sets_to_pandas_dataframe(

# Convert to Pandas DataFrame
feature_data_set_dataframes.append(
feature_data_set_to_pandas_dataframe(
feature_sets[feature_data_set.name], feature_data_set
)
feature_data_set_to_pandas_dataframe(feature_data_set)
)

# Join dataframes into a single feature dataframe
Expand All @@ -301,29 +289,35 @@ def join_feature_set_dataframes(


def feature_data_set_to_pandas_dataframe(
feature_set: FeatureSet, feature_data_set: GetOnlineFeaturesResponse.FeatureDataset
feature_data_set: GetOnlineFeaturesResponse.FeatureDataset
) -> pd.DataFrame:
feature_set_name = feature_data_set.name
dtypes = {}
value_attr = {}
columns = []
for field in feature_set.entities + feature_set.features:
field_proto = field.to_proto()
feature_id = feature_set_name + "." + field_proto.name
columns.append(feature_id)
feast_value_type = ValueType.Enum.Name(field_proto.value_type)
dtypes[feature_id] = FEAST_VALUETYPE_TO_DTYPE[feast_value_type]

dataframe = pd.DataFrame(columns=columns).reset_index(drop=True).astype(dtypes)

for featureRow in list(feature_data_set.feature_rows):
pandas_row = {}
for field in list(featureRow.fields):
if field.value.WhichOneof("val") is None:
feature_value = None
data = {}
first_run_done = False

for featureRow in feature_data_set.feature_rows:
for field in featureRow.fields:
feature_id = feature_set_name + "." + field.name

if not first_run_done:
columns.append(feature_id)
data[feature_id] = []
value_attr[feature_id] = field.value.WhichOneof("val")
dtypes[feature_id] = FEAST_VALUE_ATTR_TO_DTYPE[value_attr[feature_id]]

if not field.value.HasField(value_attr[feature_id]):
data[feature_id].append(None)
else:
feature_value = getattr(field.value, field.value.WhichOneof("val"))
pandas_row[feature_set_name + "." + field.name] = feature_value
dataframe = dataframe.append(pandas_row, ignore_index=True)
data[feature_id].append(getattr(field.value, value_attr[feature_id]))

first_run_done = True

dataframe = (
pd.DataFrame(columns=columns, data=data).reset_index(drop=True).astype(dtypes)
)

return dataframe

Expand All @@ -337,7 +331,7 @@ def create_feature_set_request_from_feature_strings(
if feature_set not in feature_set_request:
feature_set_name, feature_set_version = feature_set.split(":")
feature_set_request[feature_set] = GetFeaturesRequest.FeatureSet(
name=feature_set_name, version=feature_set_version
name=feature_set_name, version=int(feature_set_version)
)
feature_set_request[feature_set].feature_names.append(feature)
return list(feature_set_request.values())
6 changes: 0 additions & 6 deletions sdk/python/feast/tests/feast_serving_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ def GetFeastServingVersion(self, request, context):

def GetOnlineFeatures(self, request: GetFeaturesRequest, context):

# for feature_set_request in list(request.featureSets):
# feature_data_set = self._store.get_feature_data(
# feature_set_request=feature_set_request,
# entity_data=request.entityDataSet,
# )

response = GetOnlineFeaturesResponse(
feature_data_sets=[
GetOnlineFeaturesResponse.FeatureDataSet(
Expand Down
107 changes: 49 additions & 58 deletions sdk/python/feast/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,26 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time

import grpc
import pandas as pd
import numpy as np
import feast.core.CoreService_pb2_grpc as Core
import feast.serving.ServingService_pb2_grpc as Serving
from feast.entity import Entity
from feast.core.CoreService_pb2 import GetFeastCoreVersionResponse
from feast.core.CoreService_pb2 import GetFeatureSetsResponse
from feast.core.FeatureSet_pb2 import FeatureSetSpec
from feast.serving.ServingService_pb2 import GetFeastServingVersionResponse
from feast.serving.ServingService_pb2 import (
GetFeastServingVersionResponse,
GetOnlineFeaturesResponse,
)
from google.protobuf.timestamp_pb2 import Timestamp
import pytest
from feast.client import Client
from feast.tests import dataframes
from concurrent import futures
from feast.tests.feast_core_server import CoreServicer
from feast.tests.feast_serving_server import ServingServicer
from feast.feature import Feature
from feast.feature_set import FeatureSet
from feast.value_type import ValueType
from feast.source import KafkaSource
from feast.tests.fake_kafka import FakeKafka
from feast.types import FeatureRow_pb2 as FeatureRowProto
from feast.types import Field_pb2 as FieldProto
from feast.types import Value_pb2 as ValueProto

CORE_URL = "core.feast.example.com"
SERVING_URL = "serving.example.com"
Expand Down Expand Up @@ -94,63 +92,56 @@ def test_version(self, mock_client, mocker):
and status["serving"]["version"] == "0.3.0"
)

@pytest.mark.parametrize("dataframe", [dataframes.GOOD])
def test_ingest_then_get_one_feature_set_success(
self, core_server, dataframe: pd.DataFrame
):
# Create and register Fake Kafka
fake_kafka = FakeKafka()
def test_get_feature(self, mock_client, mocker):
ROW_COUNT = 300

# Set up Feast Serving with Fake Kafka
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Serving.add_ServingServiceServicer_to_server(
ServingServicer(kafka=fake_kafka), server
mock_client._serving_service_stub = Serving.ServingServiceStub(
grpc.insecure_channel("")
)
server.add_insecure_port("[::]:50052")
server.start()

# Create Feast client and register with Core and Serving
client = Client(core_url="localhost:50051", serving_url="localhost:50052")

# Create feature set and update based on dataframe
feature_set_1 = FeatureSet(
name="feature_set_1",
entities=[Entity(name="entity_id", dtype=ValueType.INT32)],
features=[
Feature(name="feature_1", dtype=ValueType.FLOAT),
Feature(name="feature_2", dtype=ValueType.STRING),
Feature(name="feature_3", dtype=ValueType.INT32),
],
response = GetOnlineFeaturesResponse()
feature_row = FeatureRowProto.FeatureRow(
event_timestamp=Timestamp(), feature_set="feature_set_1:1"
)
for feature_num in range(1, 10):
field = FieldProto.Field(
name="feature_" + str(feature_num),
value=ValueProto.Value(int64_val=feature_num),
)
feature_row.fields.append(field)

feature_data_set = GetOnlineFeaturesResponse.FeatureDataset(
name="feature_set_1", version=1
)

# Register feature set with Feast core
client.apply(feature_set_1)
for row_number in range(1, ROW_COUNT + 1):
feature_data_set.feature_rows.append(feature_row)

# Register Fake Kafka with feature set
feature_set_1._message_producer = fake_kafka
response.feature_datasets.append(feature_data_set)

# Ingest data into Feast using Fake Kafka
feature_set_1.ingest(dataframe)
mocker.patch.object(
mock_client._serving_service_stub,
"GetOnlineFeatures",
return_value=response,
)

time.sleep(2)
entity_data = pd.DataFrame(
{"datetime": np.repeat(4, ROW_COUNT), "entity_id": np.repeat(4, ROW_COUNT)}
)

# Retrieve feature values from Feast serving
feature_dataframe = client.get(
entity_data=dataframe[["datetime", "entity_id"]],
feature_dataframe = mock_client.get(
entity_data=entity_data,
feature_ids=[
"feature_set_1.feature_1",
"feature_set_1.feature_2",
"feature_set_1.feature_3",
"feature_set_1:1.feature_1",
"feature_set_1:1.feature_2",
"feature_set_1:1.feature_3",
"feature_set_1:1.feature_4",
"feature_set_1:1.feature_5",
"feature_set_1:1.feature_6",
"feature_set_1:1.feature_7",
"feature_set_1:1.feature_8",
"feature_set_1:1.feature_9",
],
)

assert True

# assert (
# feature_dataframe["feature_set_1.feature_1"][0] == 0.2
# and feature_dataframe["feature_set_1.feature_2"][0] == "string1"
# and feature_dataframe["feature_set_1.feature_3"][0] == 1
# )

# Stop Feast Serving server
server.stop(0)
feature_dataframe.head()
5 changes: 1 addition & 4 deletions sdk/python/feast/tests/test_feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime

import pytz
from unittest.mock import MagicMock
from feast.feature_set import FeatureSet, Feature
from feast.value_type import ValueType
Expand Down Expand Up @@ -109,7 +106,7 @@ def test_feature_set_ingest_success(self, dataframe, client):
dataframe,
column_mapping={"entity_id": Entity(name="entity", dtype=ValueType.INT64)},
)
driver_fs.source = KafkaSource(topic="feature-topic", brokers="fake.broker.com")
driver_fs.source = KafkaSource(topic="feature-topic", brokers="127.0.0.1")
driver_fs._message_producer = MagicMock()
driver_fs._message_producer.send = MagicMock()

Expand Down
14 changes: 12 additions & 2 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
# Mapping of feast value type to Pandas DataFrame dtypes
# Integer and floating values are all 64-bit for better integration
# with BigQuery data types
FEAST_VALUETYPE_TO_DTYPE = {
FEAST_VALUE_TYPE_TO_DTYPE = {
"BYTES": np.byte,
"STRING": np.object,
"INT32": "Int32", # Use pandas nullable int type
Expand All @@ -31,6 +31,16 @@
"BOOL": np.bool,
}

FEAST_VALUE_ATTR_TO_DTYPE = {
"bytes_val": np.byte,
"string_val": np.object,
"int32_val": "Int32",
"int64_val": "Int64",
"double_val": np.float64,
"float_val": np.float64,
"bool_val": np.bool,
}


def dtype_to_feast_value_attr(dtype):
# Mapping of Pandas dtype to attribute name in Feast Value
Expand Down Expand Up @@ -104,7 +114,7 @@ def pandas_dtype_to_feast_value_type(dtype: pd.DataFrame.dtypes) -> ValueType:
def pandas_value_to_proto_value(pandas_dtype, pandas_value) -> ProtoValue:
value = ProtoValue()
value_attr = dtype_to_feast_value_attr(pandas_dtype)
if pandas_dtype.__str__() in ["datetime64[ns]", "datetime64[ns, UTC]"]:
if pandas_dtype.__str__() in ["datetime64[ns]", "datetime64[ns, UTC]"]:
pandas_value = int(pandas_value.timestamp())
try:
value.__setattr__(value_attr, pandas_value)
Expand Down

0 comments on commit bfc3403

Please sign in to comment.