-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Stream Feature View FCOS (#2750)
* Fix working version Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Working commit Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fixes Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix stuffs Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix things Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lihnt Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix stuff Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix unit tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Address review comments Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fixed Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Unsaved changes Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
- Loading branch information
Showing
18 changed files
with
761 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
syntax = "proto3"; | ||
package feast.core; | ||
|
||
option go_package = "github.com/feast-dev/feast/go/protos/feast/core"; | ||
option java_outer_classname = "AggregationProto"; | ||
option java_package = "feast.proto.core"; | ||
|
||
import "google/protobuf/duration.proto"; | ||
|
||
message Aggregation { | ||
string column = 1; | ||
string function = 2; | ||
google.protobuf.Duration time_window = 3; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
// | ||
// Copyright 2020 The Feast Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// | ||
|
||
|
||
syntax = "proto3"; | ||
package feast.core; | ||
|
||
option go_package = "github.com/feast-dev/feast/go/protos/feast/core"; | ||
option java_outer_classname = "StreamFeatureViewProto"; | ||
option java_package = "feast.proto.core"; | ||
|
||
|
||
import "google/protobuf/duration.proto"; | ||
import "google/protobuf/timestamp.proto"; | ||
import "feast/core/OnDemandFeatureView.proto"; | ||
import "feast/core/Feature.proto"; | ||
import "feast/core/DataSource.proto"; | ||
import "feast/core/Aggregation.proto"; | ||
|
||
message StreamFeatureView { | ||
// User-specified specifications of this feature view. | ||
StreamFeatureViewSpec spec = 1; | ||
StreamFeatureViewMeta meta = 2; | ||
} | ||
|
||
// Next available id: 17 | ||
message StreamFeatureViewSpec { | ||
// Name of the feature view. Must be unique. Not updated. | ||
string name = 1; | ||
|
||
// Name of Feast project that this feature view belongs to. | ||
string project = 2; | ||
|
||
// List of names of entities associated with this feature view. | ||
repeated string entities = 3; | ||
|
||
// List of specifications for each feature defined as part of this feature view. | ||
repeated FeatureSpecV2 features = 4; | ||
|
||
// List of specifications for each entity defined as part of this feature view. | ||
repeated FeatureSpecV2 entity_columns = 5; | ||
|
||
// Description of the feature view. | ||
string description = 6; | ||
|
||
// User defined metadata | ||
map<string,string> tags = 7; | ||
|
||
// Owner of the feature view. | ||
string owner = 8; | ||
|
||
// Features in this feature view can only be retrieved from online serving | ||
// younger than ttl. Ttl is measured as the duration of time between | ||
// the feature's event timestamp and when the feature is retrieved | ||
// Feature values outside ttl will be returned as unset values and indicated to end user | ||
google.protobuf.Duration ttl = 9; | ||
|
||
// Batch/Offline DataSource where this view can retrieve offline feature data. | ||
DataSource batch_source = 10; | ||
// Streaming DataSource from where this view can consume "online" feature data. | ||
DataSource stream_source = 11; | ||
|
||
// Whether these features should be served online or not | ||
bool online = 12; | ||
|
||
// Serialized function that is encoded in the streamfeatureview | ||
UserDefinedFunction user_defined_function = 13; | ||
|
||
// Mode of execution | ||
string mode = 14; | ||
|
||
// Aggregation definitions | ||
repeated Aggregation aggregations = 15; | ||
|
||
// Timestamp field for aggregation | ||
string timestamp_field = 16; | ||
} | ||
|
||
message StreamFeatureViewMeta { | ||
// Time where this Feature View is created | ||
google.protobuf.Timestamp created_timestamp = 1; | ||
|
||
// Time where this Feature View is last updated | ||
google.protobuf.Timestamp last_updated_timestamp = 2; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
from datetime import timedelta | ||
from typing import Optional | ||
|
||
from google.protobuf.duration_pb2 import Duration | ||
|
||
from feast.protos.feast.core.Aggregation_pb2 import Aggregation as AggregationProto | ||
|
||
|
||
class Aggregation: | ||
""" | ||
NOTE: Feast-handled aggregations are not yet supported. This class provides a way to register user-defined aggregations. | ||
Attributes: | ||
column: str # Column name of the feature we are aggregating. | ||
function: str # Provided built in aggregations sum, max, min, count mean | ||
time_window: timedelta # The time window for this aggregation. | ||
""" | ||
|
||
column: str | ||
function: str | ||
time_window: Optional[timedelta] | ||
|
||
def __init__( | ||
self, | ||
column: Optional[str] = "", | ||
function: Optional[str] = "", | ||
time_window: Optional[timedelta] = None, | ||
): | ||
self.column = column or "" | ||
self.function = function or "" | ||
self.time_window = time_window | ||
|
||
def to_proto(self) -> AggregationProto: | ||
window_duration = None | ||
if self.time_window is not None: | ||
window_duration = Duration() | ||
window_duration.FromTimedelta(self.time_window) | ||
|
||
return AggregationProto( | ||
column=self.column, function=self.function, time_window=window_duration | ||
) | ||
|
||
@classmethod | ||
def from_proto(cls, agg_proto: AggregationProto): | ||
time_window = ( | ||
timedelta(days=0) | ||
if agg_proto.time_window.ToNanoseconds() == 0 | ||
else agg_proto.time_window.ToTimedelta() | ||
) | ||
|
||
aggregation = cls( | ||
column=agg_proto.column, | ||
function=agg_proto.function, | ||
time_window=time_window, | ||
) | ||
return aggregation | ||
|
||
def __eq__(self, other): | ||
if not isinstance(other, Aggregation): | ||
raise TypeError("Comparisons should only involve Aggregations.") | ||
|
||
if ( | ||
self.column != other.column | ||
or self.function != other.function | ||
or self.time_window != other.time_window | ||
): | ||
return False | ||
|
||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.