-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor: Introduce a Global Registry (#7392)
* Introduce a Global Registry *Motivation:* In the current libbeat implementation and also inside the actual beat we are defining registries for each types of feature that we want to expose. This add duplication to the project, the global registry is a way to keep the flexibility of multiple features and reduce the duplication code only to take care of the type satefy. Also all features now use an init function to make the plugin register with their specific registry. This PR is a step forward to remove that pattern and use a global variable in the package to identify the feature. This change will allow a beat author to build a beat with only a specific set of feature. Example: Build with only ES out and not Logstash and kafka, this could reduce the size of some beats. This PR is written in a backward compatible way, to make the init and the new feature works both at the same time. Instead of using an init function you will the following to expose the feature. ```golang // Feature exposes a spooling to disk queue. var Feature = queue.Feature("spool", create, feature.Beta) ``` Each new type of feature require to implement two things for type satefy: - A factory method to assert the correct type at runtime. - A sugar method like the `queue.Feature`, for type satefy at compile time. *Example:* ```golang // Feature creates a new type of queue. func Feature(name string, factory Factory, stability feature.Stability) *feature.Feature { return feature.New(Namespace, name, factory, stability) } // FindFactory retrieves a queue types constructor. Returns nil if queue type is unknown func FindFactory(name string) Factory { f, err := feature.Registry.Find(Namespace, name) if err != nil { return nil } factory, ok := f.Factory().(Factory) if !ok { return nil } return factory } ``` How it will look like for building beats with a minimal set of plugins: ``` b := MustBundle( MustBundle(docker.Feature), MustBundle(dissect.Feature), MustBundle(elasticsearch.Feature, logstash.Feature), ) feature.RegisterBundle(b) ``` *Caveats:* we still expose the methods and the registry as global, but this is a step to to isolate a registry per beat. * Example of backward compatibility for the mem/spool queues * allow to filter on multiple stabilities * use a reference * Stability extracts into his own file. * Review first round. * uses lookup and lookupAll instead * adding developer changelog * use a variadic function
- Loading branch information
Showing
11 changed files
with
556 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package feature | ||
|
||
// Bundleable merges featurable and bundle interface together. | ||
type bundleable interface { | ||
Features() []Featurable | ||
} | ||
|
||
// Bundle defines a list of features available in the current beat. | ||
type Bundle struct { | ||
features []Featurable | ||
} | ||
|
||
// NewBundle creates a new Bundle of feature to be registered. | ||
func NewBundle(features ...Featurable) *Bundle { | ||
return &Bundle{features: features} | ||
} | ||
|
||
// Filter creates a new bundle with only the feature matching the requested stability. | ||
func (b *Bundle) Filter(stabilities ...Stability) *Bundle { | ||
var filtered []Featurable | ||
|
||
for _, feature := range b.features { | ||
for _, stability := range stabilities { | ||
if feature.Stability() == stability { | ||
filtered = append(filtered, feature) | ||
break | ||
} | ||
} | ||
} | ||
return NewBundle(filtered...) | ||
} | ||
|
||
// Features returns the interface features slice so | ||
func (b *Bundle) Features() []Featurable { | ||
return b.features | ||
} | ||
|
||
// MustBundle takes existing bundle or features and create a new Bundle with all the merged Features. | ||
func MustBundle(bundle ...bundleable) *Bundle { | ||
var merged []Featurable | ||
for _, feature := range bundle { | ||
merged = append(merged, feature.Features()...) | ||
} | ||
return NewBundle(merged...) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package feature | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestBundle(t *testing.T) { | ||
factory := func() {} | ||
features := []Featurable{ | ||
New("libbeat.outputs", "elasticsearch", factory, Stable), | ||
New("libbeat.outputs", "edge", factory, Experimental), | ||
New("libbeat.input", "tcp", factory, Beta), | ||
} | ||
|
||
t.Run("Creates a new Bundle", func(t *testing.T) { | ||
b := NewBundle(features...) | ||
assert.Equal(t, 3, len(b.Features())) | ||
}) | ||
|
||
t.Run("Filters feature based on stability", func(t *testing.T) { | ||
b := NewBundle(features...) | ||
new := b.Filter(Experimental) | ||
assert.Equal(t, 1, len(new.Features())) | ||
}) | ||
|
||
t.Run("Filters feature based on multiple different stability", func(t *testing.T) { | ||
b := NewBundle(features...) | ||
new := b.Filter(Experimental, Stable) | ||
assert.Equal(t, 2, len(new.Features())) | ||
}) | ||
|
||
t.Run("Creates a new Bundle from specified feature", func(t *testing.T) { | ||
f1 := New("libbeat.outputs", "elasticsearch", factory, Stable) | ||
b := MustBundle(f1) | ||
assert.Equal(t, 1, len(b.Features())) | ||
}) | ||
|
||
t.Run("Creates a new Bundle with grouped features", func(t *testing.T) { | ||
f1 := New("libbeat.outputs", "elasticsearch", factory, Stable) | ||
f2 := New("libbeat.outputs", "edge", factory, Experimental) | ||
f3 := New("libbeat.input", "tcp", factory, Beta) | ||
f4 := New("libbeat.input", "udp", factory, Beta) | ||
|
||
b := MustBundle( | ||
MustBundle(f1), | ||
MustBundle(f2), | ||
MustBundle(f3, f4), | ||
) | ||
|
||
assert.Equal(t, 4, len(b.Features())) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package feature | ||
|
||
import ( | ||
"fmt" | ||
) | ||
|
||
// Registry is the global plugin registry, this variable is meant to be temporary to move all the | ||
// internal factory to receive a context that include the current beat registry. | ||
var Registry = newRegistry() | ||
|
||
// Featurable implements the description of a feature. | ||
type Featurable interface { | ||
bundleable | ||
|
||
// Namespace is the kind of plugin or functionality we want to expose as a feature. | ||
// Examples: Autodiscover's provider, processors, outputs. | ||
Namespace() string | ||
|
||
// Name is the name of the feature, the name must unique by namespace and be a description of the | ||
// actual functionality, it is usually the name of the package. | ||
// Examples: dissect, elasticsearch, redis | ||
Name() string | ||
|
||
// Factory returns the function used to create an instance of the Feature, the signature | ||
// of the method is type checked by the 'FindFactory' of each namespace. | ||
Factory() interface{} | ||
|
||
// Stability is the stability of the Feature, this allow the user to filter embedded functionality | ||
// by their maturity at runtime. | ||
// Example: Beta, Experimental, Stable or Undefined. | ||
Stability() Stability | ||
|
||
String() string | ||
} | ||
|
||
// Feature contains the information for a specific feature | ||
type Feature struct { | ||
namespace string | ||
name string | ||
factory interface{} | ||
stability Stability | ||
} | ||
|
||
// Namespace return the namespace of the feature. | ||
func (f *Feature) Namespace() string { | ||
return f.namespace | ||
} | ||
|
||
// Name returns the name of the feature. | ||
func (f *Feature) Name() string { | ||
return f.name | ||
} | ||
|
||
// Factory returns the factory for the feature. | ||
func (f *Feature) Factory() interface{} { | ||
return f.factory | ||
} | ||
|
||
// Stability returns the stability level of the feature, current: stable, beta, experimental. | ||
func (f *Feature) Stability() Stability { | ||
return f.stability | ||
} | ||
|
||
// Features return the current feature as a slice to be compatible with Bundle merging and filtering. | ||
func (f *Feature) Features() []Featurable { | ||
return []Featurable{f} | ||
} | ||
|
||
// String return the debug information | ||
func (f *Feature) String() string { | ||
return fmt.Sprintf("%s/%s (stability: %s)", f.namespace, f.name, f.stability) | ||
} | ||
|
||
// New returns a new Feature. | ||
func New(namespace, name string, factory interface{}, stability Stability) *Feature { | ||
return &Feature{ | ||
namespace: namespace, | ||
name: name, | ||
factory: factory, | ||
stability: stability, | ||
} | ||
} | ||
|
||
// RegisterBundle registers a bundle of features. | ||
func RegisterBundle(bundle *Bundle) error { | ||
for _, f := range bundle.Features() { | ||
Registry.Register(f) | ||
} | ||
return nil | ||
} | ||
|
||
// Register register a new feature on the global registry. | ||
func Register(feature Featurable) error { | ||
return Registry.Register(feature) | ||
} | ||
|
||
// MustRegister register a new Feature on the global registry and panic on error. | ||
func MustRegister(feature Featurable) { | ||
err := Register(feature) | ||
if err != nil { | ||
panic(err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
package feature | ||
|
||
import ( | ||
"fmt" | ||
"reflect" | ||
"sync" | ||
|
||
"github.com/elastic/beats/libbeat/logp" | ||
) | ||
|
||
type mapper map[string]map[string]Featurable | ||
|
||
// Registry implements a global registry for any kind of feature in beats. | ||
// feature are grouped by namespace, a namespace is a kind of plugin like outputs, inputs, or queue. | ||
// The feature name must be unique. | ||
type registry struct { | ||
sync.RWMutex | ||
namespaces mapper | ||
log *logp.Logger | ||
} | ||
|
||
// NewRegistry returns a new registry. | ||
func newRegistry() *registry { | ||
return ®istry{ | ||
namespaces: make(mapper), | ||
log: logp.NewLogger("registry"), | ||
} | ||
} | ||
|
||
// Register registers a new feature into a specific namespace, namespace are lazy created. | ||
// Feature name must be unique. | ||
func (r *registry) Register(feature Featurable) error { | ||
r.Lock() | ||
defer r.Unlock() | ||
|
||
ns := feature.Namespace() | ||
name := feature.Name() | ||
|
||
// Lazy create namespaces | ||
_, found := r.namespaces[ns] | ||
if !found { | ||
r.namespaces[ns] = make(map[string]Featurable) | ||
} | ||
|
||
f, found := r.namespaces[ns][name] | ||
if found { | ||
if featuresEqual(feature, f) { | ||
// Allow both old style and new style of plugin to work together. | ||
r.log.Debugw( | ||
"ignoring, feature '%s' is already registered in the namespace '%s'", | ||
name, | ||
ns, | ||
) | ||
return nil | ||
} | ||
|
||
return fmt.Errorf( | ||
"could not register new feature '%s' in namespace '%s', feature name must be unique", | ||
name, | ||
ns, | ||
) | ||
} | ||
|
||
r.log.Debugw( | ||
"registering new feature", | ||
"namespace", | ||
ns, | ||
"name", | ||
name, | ||
) | ||
|
||
r.namespaces[ns][name] = feature | ||
|
||
return nil | ||
} | ||
|
||
// Unregister removes a feature from the registry. | ||
func (r *registry) Unregister(namespace, name string) error { | ||
r.Lock() | ||
defer r.Unlock() | ||
|
||
v, found := r.namespaces[namespace] | ||
if !found { | ||
return fmt.Errorf("unknown namespace named '%s'", namespace) | ||
} | ||
|
||
_, found = v[name] | ||
if !found { | ||
return fmt.Errorf("unknown feature '%s' in namespace '%s'", name, namespace) | ||
} | ||
|
||
delete(r.namespaces[namespace], name) | ||
return nil | ||
} | ||
|
||
// Lookup searches for a Feature by the namespace-name pair. | ||
func (r *registry) Lookup(namespace, name string) (Featurable, error) { | ||
r.RLock() | ||
defer r.RUnlock() | ||
|
||
v, found := r.namespaces[namespace] | ||
if !found { | ||
return nil, fmt.Errorf("unknown namespace named '%s'", namespace) | ||
} | ||
|
||
m, found := v[name] | ||
if !found { | ||
return nil, fmt.Errorf("unknown feature '%s' in namespace '%s'", name, namespace) | ||
} | ||
|
||
return m, nil | ||
} | ||
|
||
// LookupAll returns all the features for a specific namespace. | ||
func (r *registry) LookupAll(namespace string) ([]Featurable, error) { | ||
r.RLock() | ||
defer r.RUnlock() | ||
|
||
v, found := r.namespaces[namespace] | ||
if !found { | ||
return nil, fmt.Errorf("unknown namespace named '%s'", namespace) | ||
} | ||
|
||
list := make([]Featurable, len(v)) | ||
c := 0 | ||
for _, feature := range v { | ||
list[c] = feature | ||
c++ | ||
} | ||
|
||
return list, nil | ||
} | ||
|
||
// Size returns the number of registered features in the registry. | ||
func (r *registry) Size() int { | ||
r.RLock() | ||
defer r.RUnlock() | ||
|
||
c := 0 | ||
for _, namespace := range r.namespaces { | ||
c += len(namespace) | ||
} | ||
|
||
return c | ||
} | ||
|
||
func featuresEqual(f1, f2 Featurable) bool { | ||
// There is no safe way to compare function in go, | ||
// but since the function pointers are global it should be stable. | ||
if f1.Name() == f2.Name() && | ||
f1.Namespace() == f2.Namespace() && | ||
reflect.ValueOf(f1.Factory()).Pointer() == reflect.ValueOf(f2.Factory()).Pointer() { | ||
return true | ||
} | ||
|
||
return false | ||
} |
Oops, something went wrong.