Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add DQM Logging on GRPC Server with FileLogStorage for Testing #2403

Merged
merged 97 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
65880a9
Make a proof of concept
kevjumba Mar 9, 2022
b32b157
Update
kevjumba Mar 11, 2022
1996fd9
revert feature store
kevjumba Mar 17, 2022
c6656d4
refactor
kevjumba Mar 17, 2022
17c8168
Add time
kevjumba Mar 17, 2022
a4efbce
Add time
kevjumba Mar 17, 2022
79d8ba7
clean up
kevjumba Mar 17, 2022
cd652bc
Add comment
kevjumba Mar 17, 2022
4a13e8e
Add pseudocode
kevjumba Mar 17, 2022
a3f8384
Refactor logging functionality to hide internals
kevjumba Mar 20, 2022
706ee80
Refactor
kevjumba Mar 20, 2022
1831003
Revert changes
kevjumba Mar 20, 2022
9e669c3
Add tests
kevjumba Mar 20, 2022
b972634
Add new timeout test
kevjumba Mar 21, 2022
42bdc8b
Fix python ci for m1 mac
kevjumba Mar 25, 2022
ab8d1e0
Fix lint
kevjumba Mar 29, 2022
f6c2c87
Working state
kevjumba Mar 30, 2022
b6b412d
Move offline log store
kevjumba Mar 30, 2022
cb1da99
refactor
kevjumba Mar 30, 2022
155fc77
Update logs
kevjumba Mar 30, 2022
86f2208
Update log storage
kevjumba Mar 30, 2022
746dae9
WOrking state
kevjumba Mar 30, 2022
514c2f6
Work
kevjumba Mar 31, 2022
161d81d
Add tests for filestorage
kevjumba Mar 31, 2022
beec6b2
Fix logging
kevjumba Mar 31, 2022
2a1d2ec
Add more tests
kevjumba Mar 31, 2022
9446e05
Fix
kevjumba Mar 31, 2022
31e0de8
Fix
kevjumba Mar 31, 2022
137d700
Clean up
kevjumba Mar 31, 2022
ee847cd
Update error
kevjumba Mar 31, 2022
ce12eee
semi working state
kevjumba Apr 1, 2022
79c700c
b state
kevjumba Apr 4, 2022
6cbf2c2
Update types to be public
kevjumba Apr 6, 2022
aefda92
Update structs to make fields public
kevjumba Apr 6, 2022
29f72b0
Fix
kevjumba Apr 6, 2022
c4dbb84
clean up
kevjumba Apr 6, 2022
5d7978f
Fix
kevjumba Apr 7, 2022
686c583
Fix go
kevjumba Apr 7, 2022
884b014
Fix issues
kevjumba Apr 7, 2022
c02cf83
Fix
kevjumba Apr 7, 2022
e89abc1
Fix tests
kevjumba Apr 7, 2022
559bb83
Fix
kevjumba Apr 7, 2022
0f2aca9
Working state
kevjumba Apr 8, 2022
c631ef8
Fix
kevjumba Apr 8, 2022
b0a2166
fix
kevjumba Apr 8, 2022
9ed79e8
fix
kevjumba Apr 8, 2022
e472104
Clean up code a bit
kevjumba Apr 8, 2022
629be18
Fixes
kevjumba Apr 8, 2022
8a789fb
Fix
kevjumba Apr 8, 2022
6357efd
Fix tests
kevjumba Apr 8, 2022
3168473
Fix
kevjumba Apr 8, 2022
00228f7
Clean up
kevjumba Apr 8, 2022
b692e81
Update schema functionality
kevjumba Apr 9, 2022
49cd91a
Remove xitongsys parquet reader
kevjumba Apr 9, 2022
4890116
Clean up
kevjumba Apr 11, 2022
71e1e56
Fix go mode
kevjumba Apr 11, 2022
050db82
Fix tests and errors and everything
kevjumba Apr 11, 2022
dd235ca
Fix tests
kevjumba Apr 11, 2022
7c92d93
Fix
kevjumba Apr 11, 2022
4265efd
Remove unused code
kevjumba Apr 11, 2022
79cbe42
Fix
kevjumba Apr 11, 2022
21e99bd
Last working commit
kevjumba Apr 11, 2022
1b173da
work
kevjumba Apr 11, 2022
b5484c3
Address some changes
kevjumba Apr 11, 2022
5458034
More addresses.
kevjumba Apr 11, 2022
17b2bf4
Fix more review comments
kevjumba Apr 11, 2022
0a1802d
Fix
kevjumba Apr 11, 2022
18c7d60
Fix
kevjumba Apr 12, 2022
a39c9b5
Rename
kevjumba Apr 12, 2022
a4587c3
Fix
kevjumba Apr 12, 2022
83c5f89
Add request id
kevjumba Apr 12, 2022
86e605d
More fixes
kevjumba Apr 12, 2022
fa14dae
Fix odfv
kevjumba Apr 12, 2022
9f81d04
Fix
kevjumba Apr 12, 2022
e87fe22
Fix
kevjumba Apr 12, 2022
bdf0c2b
Address other changes
kevjumba Apr 12, 2022
f59f98d
Reorder for optimization
kevjumba Apr 12, 2022
d185ccd
Fix
kevjumba Apr 12, 2022
3747a5d
Add more shcema tests
kevjumba Apr 12, 2022
e9bd35b
Fix tests
kevjumba Apr 12, 2022
89974b3
refactor to clean
kevjumba Apr 12, 2022
d51544b
Add initialized repo for testing
kevjumba Apr 13, 2022
e0a4ec6
Fix
kevjumba Apr 13, 2022
5f9a50e
Remove
kevjumba Apr 13, 2022
1a5d98e
Fix tests
kevjumba Apr 13, 2022
b4d94dc
Fix
kevjumba Apr 13, 2022
8d9d0f9
Fix tests
kevjumba Apr 13, 2022
4e65987
Fix
kevjumba Apr 13, 2022
9d4effa
Text
kevjumba Apr 13, 2022
e604750
Fix
kevjumba Apr 13, 2022
49fb8bc
Fix?
kevjumba Apr 13, 2022
72b1700
Fix
kevjumba Apr 13, 2022
b4f7f41
Fix
kevjumba Apr 13, 2022
f19d19d
remove entity map
pyalex Apr 13, 2022
0145e3d
remove Cache method from registry
pyalex Apr 13, 2022
cad1156
clean up pre-initialized repo
pyalex Apr 13, 2022
89960b2
git ignore full data directory in tests
pyalex Apr 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Apr 13, 2022
commit 79cbe42d2698519191baa60ecf30e6cc389ebff3
14 changes: 6 additions & 8 deletions go/cmd/server/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type LoggingService struct {
}

func NewLoggingService(fs *feast.FeatureStore, logChannelCapacity int, featureServiceName string, enableLogging bool) (*LoggingService, error) {
// start handler processes?
var featureService *model.FeatureService = nil
var err error
if enableLogging {
Expand Down Expand Up @@ -94,16 +93,20 @@ func (s *LoggingService) processLogs() {
}
}

// Select that eitheringests new logs that are added to the logging channel, one at a time to add
// to the in memory buffer or flushes all of them synchronously to the OfflineStorage on a time interval.
func (s *LoggingService) ProcessMemoryBuffer(t *time.Ticker) {
select {
case t := <-t.C:
s.flushLogsToOfflineStorage(t)
case new_log := <-s.logChannel:
log.Printf("Pushing %s to memory.\n", new_log.FeatureValues)
log.Printf("Adding %s to memory.\n", new_log.FeatureValues)
s.memoryBuffer.logs = append(s.memoryBuffer.logs, new_log)
}
}

// Acquires the logging schema from the feature service, converts the memory buffer array of rows of logs and flushes
// them to the offline storage.
func (s *LoggingService) flushLogsToOfflineStorage(t time.Time) error {
log.Printf("Flushing buffer to offline storage with channel length: %d\n at time: "+t.String(), len(s.memoryBuffer.logs))
offlineStoreType, ok := getOfflineStoreType(s.fs.GetRepoConfig().OfflineStore)
Expand Down Expand Up @@ -158,9 +161,8 @@ func ConvertMemoryBufferToArrowTable(memoryBuffer *MemoryBuffer, fcoSchema *Sche
entityNameToEntityValues[entityName] = append(entityNameToEntityValues[entityName], l.EntityValue[idx])
}

// Contains both fv and odfv feature value types => add them in order of how the appear in the featureService
// Contains both fv and odfv feature value types => they are processed in order of how the appear in the featureService
for idx, featureName := range fcoSchema.Features {
// for featureName, idAndType := range fcoSchema.FeaturesTypes {
// populate the proto value arrays with values from memory buffer in separate columns one for each feature name
if _, ok := columnNameToProtoValueArray[featureName]; !ok {
columnNameToProtoValueArray[featureName] = make([]*types.Value, 0)
Expand Down Expand Up @@ -223,10 +225,8 @@ func ConvertMemoryBufferToArrowTable(memoryBuffer *MemoryBuffer, fcoSchema *Sche
)

result := array.Record(array.NewRecord(schema, columns, int64(len(memoryBuffer.logs))))
// create an arrow table -> write this to parquet.

tbl := array.NewTableFromRecords(schema, []array.Record{result})
// arrow table -> write this to parquet
return array.Table(tbl), nil
}

Expand All @@ -245,7 +245,6 @@ func GetSchemaFromFeatureService(featureService *model.FeatureService, entities
for _, entity := range entities {
entityNames = append(entityNames, entity.JoinKey)
}
//featureViews, err := fs.listFeatureViews(hideDummyEntity)

for _, featureView := range featureViews {
fvs[featureView.Base.Name] = featureView
Expand All @@ -261,7 +260,6 @@ func GetSchemaFromFeatureService(featureService *model.FeatureService, entities
}

allFeatureTypes := make(map[string]types.ValueType_Enum)
//allRequestDataTypes := make(map[string]*types.ValueType_Enum)
features := make([]string, 0)
for _, featureProjection := range featureService.Projections {
// Create copies of FeatureView that may contains the same *FeatureView but
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/server/logging/logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestSerializeToArrowTable(t *testing.T) {
}
}

// Initialize all dummy featureservice, entities and featureviews/on demand featureviews for testing.
func InitializeFeatureRepoVariablesForTest() (*model.FeatureService, []*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView) {
f1 := model.NewFeature(
"int64",
Expand Down Expand Up @@ -153,6 +154,7 @@ func InitializeFeatureRepoVariablesForTest() (*model.FeatureService, []*model.En
return featureService, []*model.Entity{entity1}, []*model.FeatureView{featureView1, featureView2}, []*model.OnDemandFeatureView{}
}

// Create dummy FeatureService, Entities, and FeatureViews add them to the logger and convert the logs to Arrow table.
func GenerateLogsAndConvertToArrowTable() (array.Table, error) {
featureService, entities, featureViews, odfvs := InitializeFeatureRepoVariablesForTest()
schema, err := GetSchemaFromFeatureService(featureService, entities, featureViews, odfvs)
Expand Down
6 changes: 3 additions & 3 deletions go/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s
}
// Entities are currently part of the features as a value and the order that we add it to the resp MetaData
// Need to figure out a way to map the correct entities to the correct ordering
entityValues := make(map[string][]*prototypes.Value, 0)
entityValuesMap := make(map[string][]*prototypes.Value, 0)
featureNames := make([]string, len(featureVectors))
for idx, vector := range featureVectors {

Expand All @@ -65,7 +65,7 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s
return nil, err
}
if _, ok := request.Entities[vector.Name]; ok {
entityValues[vector.Name] = values
entityValuesMap[vector.Name] = values
}
resp.Results = append(resp.Results, &serving.GetOnlineFeaturesResponse_FeatureVector{
Values: values,
Expand All @@ -74,7 +74,7 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s
})
}

go generateLogs(s, entityValues, featureNames, resp.Results, request.RequestContext)
go generateLogs(s, entityValuesMap, featureNames, resp.Results, request.RequestContext)
return resp, nil
}

Expand Down
11 changes: 8 additions & 3 deletions go/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,18 @@ func TestGetOnlineFeaturesSqliteWithLogging(t *testing.T) {
Entities: entities,
}
response, err := client.GetOnlineFeatures(ctx, request)
// Wait for logger to flush.
time.Sleep(1 * time.Second)
assert.Nil(t, err)
assert.NotNil(t, response)
// Wait for logger to flush.
// Get the featurenames without the entity order

// Get the featurenames without the entity names that are appended at the front.
featureNames := response.Metadata.FeatureNames.Val[len(request.Entities):]
// Generated expected log rows and values
// TODO(kevjumba): implement for timestamp and status
expectedLogValues, _, _ := GetExpectedLogRows(featureNames, response.Results)
expectedLogValues["driver_id"] = entities["driver_id"]
logPath, err := filepath.Abs(filepath.Join(".", "log.parquet"))
logPath, err := filepath.Abs(filepath.Join(dir, "log.parquet"))
assert.Nil(t, err)
w, err := logging.CreateOrOpenLogFile(logPath)
assert.Nil(t, err)
Expand All @@ -225,6 +228,7 @@ func TestGetOnlineFeaturesSqliteWithLogging(t *testing.T) {

assert.Nil(t, err)
assert.Equal(t, len(values), len(expectedLogValues))
// Need to iterate through and compare because certain values in types.RepeatedValues aren't accurately being compared.
for name, val := range values {
assert.Equal(t, len(val.Val), len(expectedLogValues[name].Val))
for idx, featureVal := range val.Val {
Expand All @@ -236,6 +240,7 @@ func TestGetOnlineFeaturesSqliteWithLogging(t *testing.T) {
assert.Nil(t, err)
}

// Generate the expected log rows based on the resulting feature vector returned from GetOnlineFeatures.
func GetExpectedLogRows(featureNames []string, results []*serving.GetOnlineFeaturesResponse_FeatureVector) (map[string]*types.RepeatedValue, [][]int32, [][]int64) {
numFeatures := len(featureNames)
numRows := len(results[0].Values)
Expand Down
55 changes: 3 additions & 52 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ import (
"context"
"errors"

"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/arrow/memory"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/onlineserving"
"github.com/feast-dev/feast/go/internal/feast/onlinestore"
"github.com/feast-dev/feast/go/internal/feast/registry"
"github.com/feast-dev/feast/go/internal/feast/transformation"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"google.golang.org/protobuf/types/known/timestamppb"
)

type FeatureStore struct {
Expand All @@ -32,60 +29,14 @@ type Features struct {
FeatureService *model.FeatureService
}

/*
FeatureVector type represent result of retrieving single feature for multiple rows.
It can be imagined as a column in output dataframe / table.
It contains of feature name, list of values (across all rows),
list of statuses and list of timestamp. All these lists have equal length.
And this length is also equal to number of entity rows received in request.
*/
type FeatureVector struct {
Name string
Values array.Interface
Statuses []serving.FieldStatus
Timestamps []*timestamppb.Timestamp
}

type featureViewAndRefs struct {
view *model.FeatureView
featureRefs []string
}

func (fs *FeatureStore) Registry() *registry.Registry {
return fs.registry
}

func (f *featureViewAndRefs) View() *model.FeatureView {
return f.view
}

func (f *featureViewAndRefs) FeatureRefs() []string {
return f.featureRefs
}

func (fs *FeatureStore) GetRepoConfig() *registry.RepoConfig {
return fs.config
}

/*
We group all features from a single request by entities they attached to.
Thus, we will be able to call online retrieval per entity and not per each feature view.
In this struct we collect all features and views that belongs to a group.
We also store here projected entity keys (only ones that needed to retrieve these features)
and indexes to map result of retrieval into output response.
*/
type GroupedFeaturesPerEntitySet struct {
// A list of requested feature references of the form featureViewName:featureName that share this entity set
featureNames []string
featureViewNames []string
// full feature references as they supposed to appear in response
aliasedFeatureNames []string
// Entity set as a list of EntityKeys to pass to OnlineRead
entityKeys []*prototypes.EntityKey
// Reversed mapping to project result of retrieval from storage to response
indices [][]int
}

// NewFeatureStore constructs a feature store fat client using the
// repo config (contents of feature_store.yaml converted to JSON map).
func NewFeatureStore(config *registry.RepoConfig, callback transformation.TransformationCallback) (*FeatureStore, error) {
Expand Down Expand Up @@ -325,14 +276,14 @@ func (fs *FeatureStore) GetFeatureView(featureViewName string, hideDummyEntity b
return fv, nil
}

func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*types.EntityKey,
func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*prototypes.EntityKey,
requestedFeatureViewNames []string,
requestedFeatureNames []string,
) ([][]onlinestore.FeatureData, error) {
numRows := len(entityRows)
entityRowsValue := make([]*types.EntityKey, numRows)
entityRowsValue := make([]*prototypes.EntityKey, numRows)
for index, entityKey := range entityRows {
entityRowsValue[index] = &types.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
entityRowsValue[index] = &prototypes.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
}
return fs.onlineStore.OnlineRead(ctx, entityRowsValue, requestedFeatureViewNames, requestedFeatureNames)
}
2 changes: 1 addition & 1 deletion go/internal/feast/featurestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNewFeatureStore(t *testing.T) {
}

func TestGetOnlineFeaturesRedis(t *testing.T) {
//t.Skip("@todo(achals): feature_repo isn't checked in yet")
t.Skip("@todo(achals): feature_repo isn't checked in yet")
config := registry.RepoConfig{
Project: "feature_repo",
Registry: getRegistryPath(),
Expand Down
4 changes: 2 additions & 2 deletions go/internal/feast/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ func (r *Registry) getRegistryProto() (*core.Registry, error) {
if err != nil {
return registryProto, err
}
r.Load(registryProto)
r.load(registryProto)
return registryProto, nil
}

func (r *Registry) Load(registry *core.Registry) {
func (r *Registry) load(registry *core.Registry) {
r.mu.Lock()
defer r.mu.Unlock()
r.cachedRegistry = registry
Expand Down