Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions pkg/epp/datalayer/attributemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,27 @@ type AttributeMap interface {
Put(string, Cloneable)
Get(string) (Cloneable, bool)
Keys() []string
Clone() *Attributes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR -
do we want to support in the future also Delete operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, mostly for completeness, but I don't currently see a use case to remove an attribute from an endpoint.

}

// Attributes provides a goroutine safe implementation of AttributeMap.
// Attributes provides a goroutine-safe implementation of AttributeMap.
type Attributes struct {
data sync.Map
}

// NewAttributes return a new attribute map instance.
// NewAttributes returns a new instance of Attributes.
func NewAttributes() *Attributes {
return &Attributes{
data: sync.Map{},
}
return &Attributes{}
}

// Put adds (or updates) an attribute in the map.
// Put adds or updates an attribute in the map.
func (a *Attributes) Put(key string, value Cloneable) {
a.data.Store(key, value) // TODO: Clone into map?
if value != nil {
a.data.Store(key, value) // TODO: Clone into map to ensure isolation
}
}

// Get returns an attribute from the map.
// Get retrieves an attribute by key, returning a cloned copy.
func (a *Attributes) Get(key string) (Cloneable, bool) {
val, ok := a.data.Load(key)
if !ok {
Expand All @@ -60,30 +61,31 @@ func (a *Attributes) Get(key string) (Cloneable, bool) {
if cloneable, ok := val.(Cloneable); ok {
return cloneable.Clone(), true
}
return nil, false // shouldn't happen since Put accepts Cloneables only
return nil, false
}

// Keys returns an array of all the names of attributes stored in the map.
// Keys returns all keys in the attribute map.
func (a *Attributes) Keys() []string {
keys := []string{}
var keys []string
a.data.Range(func(key, _ any) bool {
if k, ok := key.(string); ok {
keys = append(keys, k)
if sk, ok := key.(string); ok {
keys = append(keys, sk)
}
return true // continue iteration
return true
})
return keys
}

// Clone the attributes object itself.
// Clone creates a deep copy of the entire Attributes map.
func (a *Attributes) Clone() *Attributes {
cloned := &Attributes{
data: sync.Map{},
}

a.data.Range(func(k, v interface{}) bool {
cloned.data.Store(k, v)
clone := NewAttributes()
a.data.Range(func(key, value any) bool {
if sk, ok := key.(string); ok {
if v, ok := value.(Cloneable); ok {
clone.Put(sk, v)
}
}
return true
})
return cloned
return clone
}
71 changes: 71 additions & 0 deletions pkg/epp/datalayer/attributemap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datalayer

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
)

type dummy struct {
Text string
}

func (d *dummy) Clone() Cloneable {
return &dummy{Text: d.Text}
}

func TestExpectPutThenGetToMatch(t *testing.T) {
attrs := NewAttributes()
original := &dummy{"foo"}
attrs.Put("a", original)

got, ok := attrs.Get("a")
assert.True(t, ok, "expected key to exist")
assert.NotSame(t, original, got, "expected Get to return a clone, not original")

dv, ok := got.(*dummy)
assert.True(t, ok, "expected value to be of type *dummy")
assert.Equal(t, "foo", dv.Text)
Comment on lines +43 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use cmp.diff?

Copy link
Contributor Author

@elevran elevran Jul 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to use assert.Equal for basic types and leave cmp.Diff for slices, maps and complex structs.
Changed to cmp.Diff throughout

}

func TestExpectKeysToMatchAdded(t *testing.T) {
attrs := NewAttributes()
attrs.Put("x", &dummy{"1"})
attrs.Put("y", &dummy{"2"})

keys := attrs.Keys()
assert.Len(t, keys, 2)
assert.ElementsMatch(t, keys, []string{"x", "y"})
}

func TestCloneReturnsCopy(t *testing.T) {
original := NewAttributes()
original.Put("k", &dummy{"value"})

cloned := original.Clone()

kOrig, _ := original.Get("k")
kClone, _ := cloned.Get("k")

assert.NotSame(t, kOrig, kClone, "expected cloned value to be a different instance")
if diff := cmp.Diff(kOrig, kClone); diff != "" {
t.Errorf("Unexpected output (-want +got): %v", diff)
}
}
96 changes: 34 additions & 62 deletions pkg/epp/datalayer/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,126 +23,98 @@ import (
"sync"
)

// DataSource is an interface required from all data layer data collection
// sources.
// DataSource provides raw data to registered Extractors.
type DataSource interface {
// Name returns the name of this datasource.
Name() string

// AddExtractor adds an extractor to the data source.
// The extractor will be called whenever the Collector might
// AddExtractor adds an extractor to the data source. Multiple
// Extractors can be registered.
// The extractor will be called whenever the DataSource might
// have some new raw information regarding an endpoint.
// The Extractor's expected input type should be validated against
// the data source's output type upon registration.
AddExtractor(extractor Extractor) error

// Collect is triggered by the data layer framework to fetch potentially new
// data for an endpoint. It passes retrieved data to registered Extractors.
// data for an endpoint. Collect calls registered Extractors to convert the
// raw data into structured attributes.
Collect(ep Endpoint)
}

// Extractor is used to convert raw data into relevant data layer information
// for an endpoint. They are called by data sources whenever new data might be
// available. Multiple Extractors can be registered with a source. Extractors
// are expected to save their output with an endpoint so it becomes accessible
// to consumers in other subsystem of the inference gateway (e.g., when making
// scheduling decisions).
// Extractor transforms raw data into structured attributes.
type Extractor interface {
// Name returns the name of the extractor.
Name() string

// ExpectedType defines the type expected by the extractor. It must match
// the output type of the data source where the extractor is registered.
// ExpectedType defines the type expected by the extractor.
ExpectedInputType() reflect.Type

// Extract transforms the data source output into a concrete attribute that
// is stored on the given endpoint.
// Extract transforms the raw data source output into a concrete structured
// attribute, stored on the given endpoint.
Extract(data any, ep Endpoint)
}

var (
// defaultDataSources is the system default data source registry.
defaultDataSources = DataSourceRegistry{}
)
var defaultDataSources = DataSourceRegistry{}

// DataSourceRegistry stores named data sources and makes them
// accessible to other subsystems in the inference gateway.
// DataSourceRegistry stores named data sources.
type DataSourceRegistry struct {
sources sync.Map
}
Comment on lines +52 to 57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we actually need DataSourceRegistry struct?

what would be different if we define defaultDataSources = sync.Map and move logic from the struct functions to the public functions?

Copy link
Contributor Author

@elevran elevran Jul 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes testing (and especially parallel tests) much easier since there's no global state that needs to be synchronized between tests as each can use its own registry struct if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nirrozenbaum since tests moved to the datalayer package, I could make DataSourceRegistry private if you think that would make it clearer.


// Register adds a source to the registry.
// Register adds a new DataSource to the registry.
func (dsr *DataSourceRegistry) Register(src DataSource) error {
if src == nil {
return errors.New("unable to register a nil data source")
}

if _, found := dsr.sources.Load(src.Name()); found {
if _, loaded := dsr.sources.LoadOrStore(src.Name(), src); loaded {
return fmt.Errorf("unable to register duplicate data source: %s", src.Name())
}
dsr.sources.Store(src.Name(), src)
return nil
}

// GetNamedSource returns the named data source, if found.
// GetNamedSource fetches a source by name.
func (dsr *DataSourceRegistry) GetNamedSource(name string) (DataSource, bool) {
if name == "" {
return nil, false
}

if val, found := dsr.sources.Load(name); found {
if val, ok := dsr.sources.Load(name); ok {
if ds, ok := val.(DataSource); ok {
return ds, true
} // ignore type assertion failures and fall through
}
}
return nil, false
}

// GetSources returns all sources registered.
// GetSources returns all registered sources.
func (dsr *DataSourceRegistry) GetSources() []DataSource {
sources := []DataSource{}
var result []DataSource
dsr.sources.Range(func(_, val any) bool {
if ds, ok := val.(DataSource); ok {
sources = append(sources, ds)
result = append(result, ds)
}
return true // continue iteration
return true
})
return sources
return result
}

// RegisterSource adds the data source to the default registry.
// --- default registry accessors ---

func RegisterSource(src DataSource) error {
return defaultDataSources.Register(src)
}

// GetNamedSource returns the named source from the default registry,
// if found.
func GetNamedSource(name string) (DataSource, bool) {
return defaultDataSources.GetNamedSource(name)
}

// GetSources returns all sources in the default registry.
func GetSources() []DataSource {
return defaultDataSources.GetSources()
}

// ValidateExtractorType checks if an extractor can handle
// the collector's output.
func ValidateExtractorType(collectorOutputType, extractorInputType reflect.Type) error {
if collectorOutputType == extractorInputType {
return nil
}

// extractor accepts anything (i.e., interface{})
if extractorInputType.Kind() == reflect.Interface && extractorInputType.NumMethod() == 0 {
return nil
// the DataSource's output. It should be called by a DataSource
// when an extractor is added.
func ValidateExtractorType(collectorOutput, extractorInput reflect.Type) error {
if collectorOutput == nil || extractorInput == nil {
return errors.New("extractor input type or data source output type can't be nil")
}

// check if collector output implements extractor input interface
if collectorOutputType.Implements(extractorInputType) {
if collectorOutput == extractorInput ||
(extractorInput.Kind() == reflect.Interface && extractorInput.NumMethod() == 0) ||
(extractorInput.Kind() == reflect.Interface && collectorOutput.Implements(extractorInput)) {
return nil
}

return fmt.Errorf("extractor input type %v cannot handle collector output type %v",
extractorInputType, collectorOutputType)
return fmt.Errorf("extractor input type %v cannot handle data source output type %v",
extractorInput, collectorOutput)
}
Loading