Skip to content

Commit

Permalink
feat: Write logged features to an offline store (Python API) (#2574)
Browse files Browse the repository at this point in the history
* write logs to offline store

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* format

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix after rebase

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix tests

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* handle table not found in tests

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* some api docs

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix import

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* use predefined schema in tests

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* address pr comments

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* more api docs

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* add proto attr to snowflake dest

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* add prefixes to system fields

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* add custom destination

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* move partition columns to destination config

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* after rebase

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* allow data source creator implementations w/o logging destination

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored Apr 26, 2022
1 parent 689d20b commit 134dc5f
Show file tree
Hide file tree
Showing 23 changed files with 936 additions and 44 deletions.
46 changes: 46 additions & 0 deletions protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "FeatureServiceProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "feast/core/FeatureViewProjection.proto";

Expand Down Expand Up @@ -35,6 +36,9 @@ message FeatureServiceSpec {

// Owner of the feature service.
string owner = 6;

// (optional) if provided logging will be enabled for this feature service.
LoggingConfig logging_config = 7;
}


Expand All @@ -46,3 +50,45 @@ message FeatureServiceMeta {
google.protobuf.Timestamp last_updated_timestamp = 2;

}


message LoggingConfig {
float sample_rate = 1;
google.protobuf.Duration partition_interval = 2;

oneof destination {
FileDestination file_destination = 3;
BigQueryDestination bigquery_destination = 4;
RedshiftDestination redshift_destination = 5;
SnowflakeDestination snowflake_destination = 6;
CustomDestination custom_destination = 7;
}

message FileDestination {
string path = 1;
string s3_endpoint_override = 2;

// column names to use for partitioning
repeated string partition_by = 3;
}

message BigQueryDestination {
// Full table reference in the form of [project:dataset.table]
string table_ref = 1;
}

message RedshiftDestination {
// Destination table name. ClusterId and database will be taken from an offline store config
string table_name = 1;
}

message SnowflakeDestination {
// Destination table name. Schema and database will be taken from an offline store config
string table_name = 1;
}

message CustomDestination {
string kind = 1;
map<string, string> config = 2;
}
}
164 changes: 164 additions & 0 deletions sdk/python/feast/feature_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import abc
from typing import TYPE_CHECKING, Dict, Optional, Type, cast

import pyarrow as pa
from pytz import UTC

from feast.data_source import DataSource
from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE
from feast.errors import (
FeastObjectNotFoundException,
FeatureViewNotFoundException,
OnDemandFeatureViewNotFoundException,
)
from feast.protos.feast.core.FeatureService_pb2 import (
LoggingConfig as LoggingConfigProto,
)
from feast.types import from_value_type

if TYPE_CHECKING:
from feast import FeatureService
from feast.registry import Registry


REQUEST_ID_FIELD = "__request_id"
LOG_TIMESTAMP_FIELD = "__log_timestamp"
LOG_DATE_FIELD = "__log_date"


class LoggingSource:
"""
Logging source describes object that produces logs (eg, feature service produces logs of served features).
It should be able to provide schema of produced logs table and additional metadata that describes logs data.
"""

@abc.abstractmethod
def get_schema(self, registry: "Registry") -> pa.Schema:
""" Generate schema for logs destination. """
raise NotImplementedError

@abc.abstractmethod
def get_log_timestamp_column(self) -> str:
""" Return timestamp column that must exist in generated schema. """
raise NotImplementedError


class FeatureServiceLoggingSource(LoggingSource):
def __init__(self, feature_service: "FeatureService", project: str):
self._feature_service = feature_service
self._project = project

def get_schema(self, registry: "Registry") -> pa.Schema:
fields: Dict[str, pa.DataType] = {}

for projection in self._feature_service.feature_view_projections:
for feature in projection.features:
fields[
f"{projection.name_to_use()}__{feature.name}"
] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype]
fields[
f"{projection.name_to_use()}__{feature.name}__timestamp"
] = PA_TIMESTAMP_TYPE
fields[
f"{projection.name_to_use()}__{feature.name}__status"
] = pa.int32()

try:
feature_view = registry.get_feature_view(projection.name, self._project)
except FeatureViewNotFoundException:
try:
on_demand_feature_view = registry.get_on_demand_feature_view(
projection.name, self._project
)
except OnDemandFeatureViewNotFoundException:
raise FeastObjectNotFoundException(
f"Can't recognize feature view with a name {projection.name}"
)

for (
request_source
) in on_demand_feature_view.source_request_sources.values():
for field in request_source.schema:
fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype]

else:
for entity_name in feature_view.entities:
entity = registry.get_entity(entity_name, self._project)
join_key = projection.join_key_map.get(
entity.join_key, entity.join_key
)
fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[
from_value_type(entity.value_type)
]

# system columns
fields[REQUEST_ID_FIELD] = pa.string()
fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC)

return pa.schema(
[pa.field(name, data_type) for name, data_type in fields.items()]
)

def get_log_timestamp_column(self) -> str:
return LOG_TIMESTAMP_FIELD


class _DestinationRegistry(type):
classes_by_proto_attr_name: Dict[str, Type["LoggingDestination"]] = {}

def __new__(cls, name, bases, dct):
kls = type.__new__(cls, name, bases, dct)
if dct.get("_proto_attr_name"):
cls.classes_by_proto_attr_name[dct["_proto_attr_name"]] = kls
return kls


class LoggingDestination:
"""
Logging destination contains details about where exactly logs should be written inside an offline store.
It is implementation specific - each offline store must implement LoggingDestination subclass.
Kind of logging destination will be determined by matching attribute name in LoggingConfig protobuf message
and "_proto_kind" property of each subclass.
"""

_proto_kind: str

@classmethod
@abc.abstractmethod
def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination":
raise NotImplementedError

@abc.abstractmethod
def to_proto(self) -> LoggingConfigProto:
raise NotImplementedError

@abc.abstractmethod
def to_data_source(self) -> DataSource:
"""
Convert this object into a data source to read logs from an offline store.
"""
raise NotImplementedError


class LoggingConfig:
destination: LoggingDestination

def __init__(self, destination: LoggingDestination):
self.destination = destination

@classmethod
def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]:
proto_kind = cast(str, config_proto.WhichOneof("destination"))
if proto_kind is None:
return

if proto_kind == "custom_destination":
proto_kind = config_proto.custom_destination.kind

destination_class = _DestinationRegistry.classes_by_proto_attr_name[proto_kind]
return LoggingConfig(destination=destination_class.from_proto(config_proto))

def to_proto(self) -> LoggingConfigProto:
proto = self.destination.to_proto()
return proto
10 changes: 10 additions & 0 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from google.protobuf.json_format import MessageToJson

from feast.base_feature_view import BaseFeatureView
from feast.feature_logging import LoggingConfig
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.on_demand_feature_view import OnDemandFeatureView
Expand Down Expand Up @@ -44,6 +45,7 @@ class FeatureService:
owner: str
created_timestamp: Optional[datetime] = None
last_updated_timestamp: Optional[datetime] = None
logging_config: Optional[LoggingConfig] = None

@log_exceptions
def __init__(
Expand All @@ -54,6 +56,7 @@ def __init__(
tags: Dict[str, str] = None,
description: str = "",
owner: str = "",
logging_config: Optional[LoggingConfig] = None,
):
"""
Creates a FeatureService object.
Expand Down Expand Up @@ -106,6 +109,7 @@ def __init__(
self.owner = owner
self.created_timestamp = None
self.last_updated_timestamp = None
self.logging_config = logging_config

def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
Expand Down Expand Up @@ -152,6 +156,9 @@ def from_proto(cls, feature_service_proto: FeatureServiceProto):
tags=dict(feature_service_proto.spec.tags),
description=feature_service_proto.spec.description,
owner=feature_service_proto.spec.owner,
logging_config=LoggingConfig.from_proto(
feature_service_proto.spec.logging_config
),
)
fs.feature_view_projections.extend(
[
Expand Down Expand Up @@ -192,6 +199,9 @@ def to_proto(self) -> FeatureServiceProto:
tags=self.tags,
description=self.description,
owner=self.owner,
logging_config=self.logging_config.to_proto()
if self.logging_config
else None,
)

return FeatureServiceProto(spec=spec, meta=meta)
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)

import pandas as pd
import pyarrow as pa
from colorama import Fore, Style
from google.protobuf.timestamp_pb2 import Timestamp
from tqdm import tqdm
Expand Down Expand Up @@ -1976,6 +1977,25 @@ def serve_transformations(self, port: int) -> None:
def _teardown_go_server(self):
self._go_server = None

def write_logged_features(self, logs: pa.Table, source: Union[FeatureService]):
"""
Write logs produced by a source (currently only feature service is supported as a source)
to an offline store.
"""
if not isinstance(source, FeatureService):
raise ValueError("Only feature service is currently supported as a source")

assert (
source.logging_config is not None
), "Feature service must be configured with logging config in order to use this functionality"

self._get_provider().write_feature_service_logs(
feature_service=source,
logs=logs,
config=self.config,
registry=self._registry,
)


def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
set_of_row_lengths = {len(v) for v in join_key_values.values()}
Expand Down
Loading

0 comments on commit 134dc5f

Please sign in to comment.