From 2f20d0c404f41dc4efa0482a2a8d3a4d0588e403 Mon Sep 17 00:00:00 2001 From: Jay Camp Date: Fri, 17 Apr 2020 13:53:48 -0400 Subject: [PATCH] Add observer support to receiver_creator (#173) * Add observer notification interface (k8s observer will be in separate PR) * Refactor receiver_creator to be more easily testable and organized * receiver.go mostly implements OT interface and delegates to the new files * observerhandler.go responds to observer events and manages the starting/stopping of receivers * rules.go implements rules evaluation (not currently implemented) * runner.go contains a runner interface that handles the details of how to start and stop a receiver instance that the observer handler wants to start/stop * Implement basic add/remove/change response in receiver_creator to observer events --- extension/observer/Makefile | 1 + extension/observer/go.mod | 3 + extension/observer/observer.go | 122 +++++++++++++++++ go.sum | 1 + receiver/receivercreator/config.go | 45 ++++--- receiver/receivercreator/config_test.go | 10 +- receiver/receivercreator/factory.go | 8 +- receiver/receivercreator/go.mod | 4 +- receiver/receivercreator/go.sum | 2 + receiver/receivercreator/observerhandler.go | 107 +++++++++++++++ .../receivercreator/observerhandler_test.go | 114 ++++++++++++++++ receiver/receivercreator/receiver.go | 125 ++++++------------ receiver/receivercreator/receiver_test.go | 76 +++++------ receiver/receivercreator/receivermap.go | 54 ++++++++ receiver/receivercreator/receivermap_test.go | 58 ++++++++ receiver/receivercreator/rules.go | 24 ++++ receiver/receivercreator/runner.go | 110 +++++++++++++++ receiver/receivercreator/runner_test.go | 50 +++++++ receiver/receivercreator/testdata/config.yaml | 2 +- 19 files changed, 759 insertions(+), 157 deletions(-) create mode 100644 extension/observer/Makefile create mode 100644 extension/observer/go.mod create mode 100644 extension/observer/observer.go create mode 100644 receiver/receivercreator/observerhandler.go create mode 100644 receiver/receivercreator/observerhandler_test.go create mode 100644 receiver/receivercreator/receivermap.go create mode 100644 receiver/receivercreator/receivermap_test.go create mode 100644 receiver/receivercreator/rules.go create mode 100644 receiver/receivercreator/runner.go create mode 100644 receiver/receivercreator/runner_test.go diff --git a/extension/observer/Makefile b/extension/observer/Makefile new file mode 100644 index 000000000000..ded7a36092dc --- /dev/null +++ b/extension/observer/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/extension/observer/go.mod b/extension/observer/go.mod new file mode 100644 index 000000000000..db52df1b6430 --- /dev/null +++ b/extension/observer/go.mod @@ -0,0 +1,3 @@ +module github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer + +go 1.14 diff --git a/extension/observer/observer.go b/extension/observer/observer.go new file mode 100644 index 000000000000..c31382e1f89a --- /dev/null +++ b/extension/observer/observer.go @@ -0,0 +1,122 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package observer + +import ( + "fmt" +) + +// Protocol defines network protocol for container ports. +type Protocol string + +const ( + // ProtocolTCP is the TCP protocol. + ProtocolTCP Protocol = "TCP" + // ProtocolUDP is the UDP protocol. + ProtocolUDP Protocol = "UDP" +) + +// Endpoint is a service that can be contacted remotely. +type Endpoint interface { + // ID uniquely identifies this endpoint. + ID() string + // Target is an IP address or hostname of the endpoint. + Target() string + // String pretty formats the endpoint. + String() string + // Labels is a map of arbitrary metadata. + Labels() map[string]string +} + +// endpointBase is common endpoint data used across all endpoint types. +type endpointBase struct { + id string + target string + labels map[string]string +} + +func (e *endpointBase) ID() string { + return e.id +} + +func (e *endpointBase) Target() string { + return e.target +} + +func (e *endpointBase) Labels() map[string]string { + return e.labels +} + +// HostEndpoint is an endpoint that just has a target but no identifying port information. +type HostEndpoint struct { + endpointBase +} + +func (h *HostEndpoint) String() string { + return fmt.Sprintf("HostEndpoint{id: %v, Target: %v, Labels: %v}", h.ID(), h.target, h.labels) +} + +// NewHostEndpoint creates a HostEndpoint +func NewHostEndpoint(id string, target string, labels map[string]string) *HostEndpoint { + return &HostEndpoint{endpointBase{ + id: id, + target: target, + labels: labels, + }} +} + +var _ Endpoint = (*HostEndpoint)(nil) + +// PortEndpoint is an endpoint that has a target as well as a port. +type PortEndpoint struct { + endpointBase + Port uint16 +} + +func (p *PortEndpoint) String() string { + return fmt.Sprintf("PortEndpoint{ID: %v, Target: %v, Port: %d, Labels: %v}", p.ID(), p.target, p.Port, p.labels) +} + +// NewPortEndpoint creates a PortEndpoint. +func NewPortEndpoint(id string, target string, port uint16, labels map[string]string) *PortEndpoint { + return &PortEndpoint{endpointBase: endpointBase{ + id: id, + target: target, + labels: labels, + }, Port: port} +} + +var _ Endpoint = (*PortEndpoint)(nil) + +// Observable is an interface that provides notification of endpoint changes. +type Observable interface { + // TODO: Stopping. + // ListAndWatch provides initial state sync as well as change notification. + // notify.OnAdd will be called one or more times if there are endpoints discovered. + // (It would not be called if there are no endpoints present.) The endpoint synchronization + // happens asynchronously to this call. + ListAndWatch(notify Notify) +} + +// Notify is the callback for Observer events. +type Notify interface { + // OnAdd is called once or more initially for state sync as well as when further endpoints are added. + OnAdd(added []Endpoint) + // OnRemove is called when one or more endpoints are removed. + OnRemove(removed []Endpoint) + // OnChange is called when one ore more endpoints are modified but the identity is not changed + // (e.g. labels). + OnChange(changed []Endpoint) +} diff --git a/go.sum b/go.sum index a8ae40520cd4..ed82931236be 100644 --- a/go.sum +++ b/go.sum @@ -683,6 +683,7 @@ github.com/honeycombio/opentelemetry-exporter-go v0.3.1 h1:oQZwILb+oEyQ45Sa+YGTr github.com/honeycombio/opentelemetry-exporter-go v0.3.1/go.mod h1:enUrMJYC617rm8rZjKSEGapNSSiUctgIVBFKFSWGGiM= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= diff --git a/receiver/receivercreator/config.go b/receiver/receivercreator/config.go index c40418382b71..6625c0efa326 100644 --- a/receiver/receivercreator/config.go +++ b/receiver/receivercreator/config.go @@ -19,41 +19,48 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configmodels" ) +// receiverConfig describes a receiver instance with a default config. +type receiverConfig struct { + // fullName is the full subreceiver name (ie /). + fullName string + // typeStr is set based on the configured receiver name. + typeStr string + // config is the map configured by the user in the config file. It is the contents of the map from + // the "config" section. The keys and values are arbitrarily configured by the user. + config userConfigMap +} + // userConfigMap is an arbitrary map of string keys to arbitrary values as specified by the user type userConfigMap map[string]interface{} -// subreceiverConfig is the configuration of a single subreceiver configured inside -// receiver_creator. -type subreceiverConfig struct { +// receiverTemplate is the configuration of a single subreceiver. +type receiverTemplate struct { + receiverConfig + // Rule is the discovery rule that when matched will create a receiver instance - // based on subreceiverConfig. + // based on receiverTemplate. Rule string `mapstructure:"rule"` - // receiverType is set based on the configured receiver name. - receiverType string - // config is the map configured by the user in the config file. It is the contents of the map from - // the "config" section. The keys and values are arbitrarily configured by the user. - config userConfigMap - // fullName is the full subreceiver name (ie /). - fullName string } -// newSubreceiverConfig creates a subreceiverConfig instance from the full name of a subreceiver +// newReceiverTemplate creates a receiverTemplate instance from the full name of a subreceiver // and its arbitrary config map values. -func newSubreceiverConfig(name string, config userConfigMap) (*subreceiverConfig, error) { +func newReceiverTemplate(name string, config userConfigMap) (receiverTemplate, error) { typeStr, fullName, err := otelconfig.DecodeTypeAndName(name) if err != nil { - return nil, err + return receiverTemplate{}, err } - return &subreceiverConfig{ - receiverType: typeStr, - fullName: fullName, - config: config, + return receiverTemplate{ + receiverConfig: receiverConfig{ + typeStr: typeStr, + fullName: fullName, + config: config, + }, }, nil } // Config defines configuration for receiver_creator. type Config struct { configmodels.ReceiverSettings `mapstructure:",squash"` - subreceiverConfigs map[string]*subreceiverConfig + receiverTemplates map[string]receiverTemplate } diff --git a/receiver/receivercreator/config_test.go b/receiver/receivercreator/config_test.go index 6f870057e77a..99f5efb577bf 100644 --- a/receiver/receivercreator/config_test.go +++ b/receiver/receivercreator/config_test.go @@ -74,10 +74,10 @@ func TestLoadConfig(t *testing.T) { r1 := cfg.Receivers["receiver_creator/1"].(*Config) assert.NotNil(t, r1) - assert.Len(t, r1.subreceiverConfigs, 1) - assert.Contains(t, r1.subreceiverConfigs, "examplereceiver/1") - assert.Equal(t, "test rule", r1.subreceiverConfigs["examplereceiver/1"].Rule) + assert.Len(t, r1.receiverTemplates, 1) + assert.Contains(t, r1.receiverTemplates, "examplereceiver/1") + assert.Equal(t, "enabled", r1.receiverTemplates["examplereceiver/1"].Rule) assert.Equal(t, userConfigMap{ - "endpoint": "localhost:12345", - }, r1.subreceiverConfigs["examplereceiver/1"].config) + endpointConfigKey: "localhost:12345", + }, r1.receiverTemplates["examplereceiver/1"].config) } diff --git a/receiver/receivercreator/factory.go b/receiver/receivercreator/factory.go index cf1a4696162e..a31840d77da0 100644 --- a/receiver/receivercreator/factory.go +++ b/receiver/receivercreator/factory.go @@ -55,7 +55,7 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { // TODO: Change the sub-receivers to be under "receivers" to allow other config for the main receiver-creator receiver. for subreceiverKey := range sourceViperSection.AllSettings() { cfgSection := sourceViperSection.GetStringMap(subreceiverKey + "::config") - subreceiver, err := newSubreceiverConfig(subreceiverKey, cfgSection) + subreceiver, err := newReceiverTemplate(subreceiverKey, cfgSection) if err != nil { return err } @@ -66,7 +66,7 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { return fmt.Errorf("failed to deserialize sub-receiver %q: %s", subreceiverKey, err) } - c.subreceiverConfigs[subreceiverKey] = subreceiver + c.receiverTemplates[subreceiverKey] = subreceiver } return nil @@ -80,7 +80,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { TypeVal: typeStr, NameVal: typeStr, }, - subreceiverConfigs: map[string]*subreceiverConfig{}, + receiverTemplates: map[string]receiverTemplate{}, } } @@ -96,5 +96,5 @@ func (f *Factory) CreateMetricsReceiver( cfg configmodels.Receiver, consumer consumer.MetricsConsumerOld, ) (component.MetricsReceiver, error) { - return new(logger, consumer, cfg.(*Config)) + return newReceiverCreator(logger, consumer, cfg.(*Config)) } diff --git a/receiver/receivercreator/go.mod b/receiver/receivercreator/go.mod index b3f88bbcac29..d56b8806b416 100644 --- a/receiver/receivercreator/go.mod +++ b/receiver/receivercreator/go.mod @@ -5,10 +5,10 @@ go 1.14 require ( github.com/census-instrumentation/opencensus-proto v0.2.1 github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200414190247-75ae9198a89e + github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.0.0 github.com/spf13/viper v1.6.2 github.com/stretchr/testify v1.4.0 go.uber.org/zap v1.13.0 ) -// Same version as from go.mod. Required to make go list -m work. -replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer => ../../extension/observer diff --git a/receiver/receivercreator/go.sum b/receiver/receivercreator/go.sum index ca55f147b9dd..cc51b2868d85 100644 --- a/receiver/receivercreator/go.sum +++ b/receiver/receivercreator/go.sum @@ -15,6 +15,7 @@ contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948 h1 contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948/go.mod h1:ukdzwIYYHgZ7QYtwVFQUjiT28BJHiMhTERo32s6qVgM= contrib.go.opencensus.io/exporter/ocagent v0.6.0 h1:Z1n6UAyr0QwM284yUuh5Zd8JlvxUGAhFZcgMJkMPrGM= contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs= +contrib.go.opencensus.io/exporter/prometheus v0.1.0 h1:SByaIoWwNgMdPSgl5sMqM2KDE5H/ukPWBRo314xiDvg= contrib.go.opencensus.io/exporter/prometheus v0.1.0/go.mod h1:cGFniUXGZlKRjzOyuZJ6mgB+PgBcCIa79kEKR8YCW+A= contrib.go.opencensus.io/resource v0.1.2 h1:b4WFJV8u7/NzPWHeTqj3Ec2AW8OGhtJxC/hbphIOvbU= contrib.go.opencensus.io/resource v0.1.2/go.mod h1:F361eGI91LCmW1I/Saf+rX0+OFcigGlFvXwEGEnkRLA= @@ -610,6 +611,7 @@ github.com/hashicorp/serf v0.8.3 h1:MWYcmct5EtKz0efYooPcL0yNkem+7kWxqXDi/UIh+8k= github.com/hashicorp/serf v0.8.3/go.mod h1:UpNcs7fFbpKIyZaUuSW6EPiH+eZC7OuyFD+wc1oal+k= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= diff --git a/receiver/receivercreator/observerhandler.go b/receiver/receivercreator/observerhandler.go new file mode 100644 index 000000000000..ca44c55529cf --- /dev/null +++ b/receiver/receivercreator/observerhandler.go @@ -0,0 +1,107 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivercreator + +import ( + "fmt" + "sync" + + "github.com/open-telemetry/opentelemetry-collector/component/componenterror" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +var _ observer.Notify = (*observerHandler)(nil) + +// observerHandler manages endpoint change notifications. +type observerHandler struct { + sync.Mutex + logger *zap.Logger + // receiverTemplates maps receiver template full name to a receiverTemplate value. + receiverTemplates map[string]receiverTemplate + // receiversByEndpointID is a map of endpoint IDs to a receiver instance. + receiversByEndpointID receiverMap + // runner starts and stops receiver instances. + runner runner +} + +// Shutdown all receivers started at runtime. +func (obs *observerHandler) Shutdown() error { + obs.Lock() + defer obs.Unlock() + + var errs []error + + for _, rcvr := range obs.receiversByEndpointID.Values() { + if err := obs.runner.shutdown(rcvr); err != nil { + // TODO: Should keep track of which receiver the error is associated with + // but require some restructuring. + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return fmt.Errorf("shutdown on %d receivers failed: %v", len(errs), componenterror.CombineErrors(errs)) + } + + return nil +} + +// OnAdd responds to endpoint add notifications. +func (obs *observerHandler) OnAdd(added []observer.Endpoint) { + obs.Lock() + defer obs.Unlock() + + for _, e := range added { + for _, template := range obs.receiverTemplates { + if !ruleMatches(template.Rule, e) { + continue + } + rcvr, err := obs.runner.start(template.receiverConfig, userConfigMap{ + endpointConfigKey: e.Target(), + }) + if err != nil { + obs.logger.Error("failed to start receiver", zap.String("receiver", template.fullName)) + continue + } + + obs.receiversByEndpointID.Put(e.ID(), rcvr) + } + } +} + +// OnRemove responds to endpoint removal notifications. +func (obs *observerHandler) OnRemove(removed []observer.Endpoint) { + obs.Lock() + defer obs.Unlock() + + for _, e := range removed { + for _, rcvr := range obs.receiversByEndpointID.Get(e.ID()) { + if err := obs.runner.shutdown(rcvr); err != nil { + obs.logger.Error("failed to stop receiver", zap.Reflect("receiver", rcvr)) + continue + } + } + obs.receiversByEndpointID.RemoveAll(e.ID()) + } +} + +// OnChange responds to endpoint change notifications. +func (obs *observerHandler) OnChange(changed []observer.Endpoint) { + // TODO: optimize to only restart if effective config has changed. + obs.OnRemove(changed) + obs.OnAdd(changed) +} diff --git a/receiver/receivercreator/observerhandler_test.go b/receiver/receivercreator/observerhandler_test.go new file mode 100644 index 000000000000..b0cbd21e8cc3 --- /dev/null +++ b/receiver/receivercreator/observerhandler_test.go @@ -0,0 +1,114 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivercreator + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector/component" + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +type mockRunner struct { + mock.Mock +} + +func (run *mockRunner) start(receiver receiverConfig, discoveredConfig userConfigMap) (component.Receiver, error) { + args := run.Called(receiver, discoveredConfig) + return args.Get(0).(component.Receiver), args.Error(1) +} + +func (run *mockRunner) shutdown(rcvr component.Receiver) error { + args := run.Called(rcvr) + return args.Error(0) +} + +var _ runner = (*mockRunner)(nil) + +func TestOnAdd(t *testing.T) { + runner := &mockRunner{} + rcvrCfg := receiverConfig{typeStr: "name", config: userConfigMap{"foo": "bar"}, fullName: "name/1"} + handler := &observerHandler{ + logger: zap.NewNop(), + receiverTemplates: map[string]receiverTemplate{ + "name/1": {rcvrCfg, "enabled"}, + }, + receiversByEndpointID: receiverMap{}, + runner: runner, + } + + runner.On("start", rcvrCfg, userConfigMap{endpointConfigKey: "localhost"}).Return(&config.ExampleReceiverProducer{}, nil) + + handler.OnAdd([]observer.Endpoint{ + observer.NewHostEndpoint("id-1", "localhost", nil), + }) + + runner.AssertExpectations(t) + assert.Equal(t, 1, handler.receiversByEndpointID.Size()) +} + +func TestOnRemove(t *testing.T) { + runner := &mockRunner{} + rcvr := &config.ExampleReceiverProducer{} + handler := &observerHandler{ + logger: zap.NewNop(), + receiversByEndpointID: receiverMap{}, + runner: runner, + } + + handler.receiversByEndpointID.Put("id-1", rcvr) + + runner.On("shutdown", rcvr).Return(nil) + + handler.OnRemove([]observer.Endpoint{ + observer.NewHostEndpoint("id-1", "localhost", nil), + }) + + runner.AssertExpectations(t) + assert.Equal(t, 0, handler.receiversByEndpointID.Size()) +} + +func TestOnChange(t *testing.T) { + runner := &mockRunner{} + rcvrCfg := receiverConfig{typeStr: "name", config: userConfigMap{"foo": "bar"}, fullName: "name/1"} + oldRcvr := &config.ExampleReceiverProducer{} + newRcvr := &config.ExampleReceiverProducer{} + handler := &observerHandler{ + logger: zap.NewNop(), + receiverTemplates: map[string]receiverTemplate{ + "name/1": {rcvrCfg, "enabled"}, + }, + receiversByEndpointID: receiverMap{}, + runner: runner, + } + + handler.receiversByEndpointID.Put("id-1", oldRcvr) + + runner.On("shutdown", oldRcvr).Return(nil) + runner.On("start", rcvrCfg, userConfigMap{endpointConfigKey: "localhost"}).Return(newRcvr, nil) + + handler.OnChange([]observer.Endpoint{ + observer.NewHostEndpoint("id-1", "localhost", nil), + }) + + runner.AssertExpectations(t) + assert.Equal(t, 1, handler.receiversByEndpointID.Size()) + assert.Same(t, newRcvr, handler.receiversByEndpointID.Get("id-1")[0]) +} diff --git a/receiver/receivercreator/receiver.go b/receiver/receivercreator/receiver.go index 1bf1dc4ded2c..81ac467da338 100644 --- a/receiver/receivercreator/receiver.go +++ b/receiver/receivercreator/receiver.go @@ -17,16 +17,17 @@ package receivercreator import ( "context" "errors" - "fmt" "github.com/open-telemetry/opentelemetry-collector/component" - "github.com/open-telemetry/opentelemetry-collector/component/componenterror" - "github.com/open-telemetry/opentelemetry-collector/config" - "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/consumer" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" ) +// endpointConfigKey is the key name mapping to ReceiverSettings.Endpoint. +const endpointConfigKey = "endpoint" + var ( errNilNextConsumer = errors.New("nil nextConsumer") ) @@ -35,14 +36,15 @@ var _ component.MetricsReceiver = (*receiverCreator)(nil) // receiverCreator implements consumer.MetricsConsumer. type receiverCreator struct { - nextConsumer consumer.MetricsConsumerOld - logger *zap.Logger - cfg *Config - receivers []component.Receiver + nextConsumer consumer.MetricsConsumerOld + logger *zap.Logger + cfg *Config + observerHandler observerHandler + observer observer.Observable } -// new creates the receiver_creator with the given parameters. -func new(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg *Config) (component.MetricsReceiver, error) { +// newReceiverCreator creates the receiver_creator with the given parameters. +func newReceiverCreator(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg *Config) (component.MetricsReceiver, error) { if nextConsumer == nil { return nil, errNilNextConsumer } @@ -55,94 +57,41 @@ func new(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg *Conf return r, nil } -// loadRuntimeReceiverConfig loads the given subreceiverConfig merged with config values -// that may have been discovered at runtime. -func (rc *receiverCreator) loadRuntimeReceiverConfig(factory component.ReceiverFactoryOld, - staticSubConfig *subreceiverConfig, discoveredConfig userConfigMap) (configmodels.Receiver, error) { - mergedConfig := config.NewViper() - - // Merge in the config values specified in the config file. - if err := mergedConfig.MergeConfigMap(staticSubConfig.config); err != nil { - return nil, fmt.Errorf("failed to merge subreceiver config from config file: %v", err) - } - - // Merge in discoveredConfig containing values discovered at runtime. - if err := mergedConfig.MergeConfigMap(discoveredConfig); err != nil { - return nil, fmt.Errorf("failed to merge subreceiver config from discovered runtime values: %v", err) - } - - viperConfig := config.NewViper() - - // Load config under / since loadReceiver and CustomUnmarshaler expects this structure. - viperConfig.Set(staticSubConfig.fullName, mergedConfig.AllSettings()) - - receiverConfig, err := config.LoadReceiver(mergedConfig, staticSubConfig.receiverType, staticSubConfig.fullName, factory) - if err != nil { - return nil, fmt.Errorf("failed to load subreceiver config: %v", err) - } - // Sets dynamically created receiver to something like receiver_creator/1/redis{endpoint="localhost:6380"}. - // TODO: Need to make sure this is unique (just endpoint is probably not totally sufficient). - receiverConfig.SetName(fmt.Sprintf("%s/%s{endpoint=%q}", rc.cfg.Name(), staticSubConfig.fullName, mergedConfig.GetString("endpoint"))) - return receiverConfig, nil +// loggingHost provides a safer version of host that logs errors instead of exiting the process. +type loggingHost struct { + component.Host + logger *zap.Logger } -// createRuntimeReceiver creates a receiver that is discovered at runtime. -func (rc *receiverCreator) createRuntimeReceiver(factory component.ReceiverFactoryOld, cfg configmodels.Receiver) (component.MetricsReceiver, error) { - return factory.CreateMetricsReceiver(rc.logger, cfg, rc.nextConsumer) +// ReportFatalError causes a log to be made instead of terminating the process as Host does by default. +func (h *loggingHost) ReportFatalError(err error) { + h.logger.Error("receiver reported a fatal error", zap.Error(err)) } +var _ component.Host = (*loggingHost)(nil) + // Start receiver_creator. func (rc *receiverCreator) Start(ctx context.Context, host component.Host) error { - // TODO: Temporarily load a single instance of all subreceivers for testing. - // Will be done in reaction to observer events later. - for _, subconfig := range rc.cfg.subreceiverConfigs { - factory := host.GetFactory(component.KindReceiver, subconfig.receiverType) - - if factory == nil { - rc.logger.Error("unable to lookup factory for receiver", zap.String("receiver", subconfig.receiverType)) - continue - } - - receiverFactory := factory.(component.ReceiverFactoryOld) - - cfg, err := rc.loadRuntimeReceiverConfig(receiverFactory, subconfig, userConfigMap{}) - if err != nil { - return err - } - recvr, err := rc.createRuntimeReceiver(receiverFactory, cfg) - if err != nil { - return err - } - - if err := recvr.Start(ctx, host); err != nil { - return fmt.Errorf("failed starting subreceiver %s: %v", cfg.Name(), err) - } - - rc.receivers = append(rc.receivers, recvr) - } - - // TODO: Can result in some receivers left running if an error is encountered - // but starting receivers here is only temporary and will be removed when - // observer interface added. + rc.observerHandler = observerHandler{ + logger: rc.logger, + receiverTemplates: rc.cfg.receiverTemplates, + receiversByEndpointID: receiverMap{}, + runner: &receiverRunner{ + logger: rc.logger, + nextConsumer: rc.nextConsumer, + idNamespace: rc.cfg.Name(), + // TODO: not really sure what context should be used here for starting subreceivers + // as don't think it makes sense to use Start context as the lifetimes are different. + ctx: context.Background(), + host: &loggingHost{host, rc.logger}, + }} + + rc.observer.ListAndWatch(&rc.observerHandler) return nil } // Shutdown stops the receiver_creator and all its receivers started at runtime. func (rc *receiverCreator) Shutdown(ctx context.Context) error { - var errs []error - - for _, recvr := range rc.receivers { - if err := recvr.Shutdown(ctx); err != nil { - // TODO: Should keep track of which receiver the error is associated with - // but require some restructuring. - errs = append(errs, err) - } - } - - if len(errs) > 0 { - return fmt.Errorf("shutdown on %d receivers failed: %v", len(errs), componenterror.CombineErrors(errs)) - } - - return nil + return rc.observerHandler.Shutdown() } diff --git a/receiver/receivercreator/receiver_test.go b/receiver/receivercreator/receiver_test.go index 4b4e82646d3e..aee2592688db 100644 --- a/receiver/receivercreator/receiver_test.go +++ b/receiver/receivercreator/receiver_test.go @@ -16,13 +16,15 @@ package receivercreator import ( "context" + "errors" "sync" "testing" + "time" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - "github.com/open-telemetry/opentelemetry-collector/component" + "github.com/open-telemetry/opentelemetry-collector/component/componenttest" "github.com/open-telemetry/opentelemetry-collector/config" "github.com/open-telemetry/opentelemetry-collector/config/configcheck" "github.com/open-telemetry/opentelemetry-collector/consumer" @@ -30,6 +32,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + zapObserver "go.uber.org/zap/zaptest/observer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" ) func TestCreateDefaultConfig(t *testing.T) { @@ -52,29 +57,41 @@ func (p *mockMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consume return nil } -func TestEndToEnd(t *testing.T) { +type mockObserver struct { +} + +func (m *mockObserver) ListAndWatch(notify observer.Notify) { + notify.OnAdd([]observer.Endpoint{observer.NewHostEndpoint("foobar", "169.168.1.100", nil)}) +} + +var _ observer.Observable = (*mockObserver)(nil) + +func TestMockedEndToEnd(t *testing.T) { host, cfg := exampleCreatorFactory(t) dynCfg := cfg.Receivers["receiver_creator/1"] factory := &Factory{} mockConsumer := &mockMetricsConsumer{} - dynReceiver, err := factory.CreateMetricsReceiver(zap.NewNop(), dynCfg, mockConsumer) + rcvr, err := factory.CreateMetricsReceiver(zap.NewNop(), dynCfg, mockConsumer) require.NoError(t, err) - require.NoError(t, dynReceiver.Start(context.Background(), host)) + dyn := rcvr.(*receiverCreator) + dyn.observer = &mockObserver{} + require.NoError(t, rcvr.Start(context.Background(), host)) var shutdownOnce sync.Once shutdown := func() { shutdownOnce.Do(func() { - assert.NoError(t, dynReceiver.Shutdown(context.Background())) + assert.NoError(t, rcvr.Shutdown(context.Background())) }) } defer shutdown() - dyn := dynReceiver.(*receiverCreator) - assert.Len(t, dyn.receivers, 1) + require.Eventuallyf(t, func() bool { + return dyn.observerHandler.receiversByEndpointID.Size() == 1 + }, 1*time.Second, 100*time.Millisecond, "expected 1 receiver but got %v", dyn.observerHandler.receiversByEndpointID) // Test that we can send metrics. - for _, receiver := range dyn.receivers { + for _, receiver := range dyn.observerHandler.receiversByEndpointID.Values() { example := receiver.(*config.ExampleReceiverProducer) assert.NoError(t, example.MetricsConsumer.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{ Node: &commonpb.Node{ @@ -106,38 +123,21 @@ func TestEndToEnd(t *testing.T) { // TODO: Will have to rework once receivers are started asynchronously to Start(). assert.Len(t, mockConsumer.Metrics, 1) - assert.Equal(t, "my-metric", mockConsumer.Metrics[0].Metrics[0].MetricDescriptor.Name) shutdown() + + assert.True(t, dyn.observerHandler.receiversByEndpointID.Values()[0].(*config.ExampleReceiverProducer).Stopped) } -func Test_loadAndCreateRuntimeReceiver(t *testing.T) { - host, cfg := exampleCreatorFactory(t) - dynCfg := cfg.Receivers["receiver_creator/1"] - factory := &Factory{} - dynReceiver, err := factory.CreateMetricsReceiver(zap.NewNop(), dynCfg, &mockMetricsConsumer{}) - require.NoError(t, err) - dr := dynReceiver.(*receiverCreator) - exampleFactory := host.GetFactory(component.KindReceiver, "examplereceiver").(component.ReceiverFactoryOld) - assert.NotNil(t, exampleFactory) - subConfig := dr.cfg.subreceiverConfigs["examplereceiver/1"] - require.NotNil(t, subConfig) - loadedConfig, err := dr.loadRuntimeReceiverConfig(exampleFactory, subConfig, userConfigMap{ - "endpoint": "localhost:12345", - }) - require.NoError(t, err) - assert.NotNil(t, loadedConfig) - exampleConfig := loadedConfig.(*config.ExampleReceiver) - // Verify that the overridden endpoint is used instead of the one in the config file. - assert.Equal(t, "localhost:12345", exampleConfig.Endpoint) - assert.Equal(t, "receiver_creator/1/examplereceiver/1{endpoint=\"localhost:12345\"}", exampleConfig.Name()) - - // Test that metric receiver can be created from loaded config. - t.Run("test create receiver from loaded config", func(t *testing.T) { - recvr, err := dr.createRuntimeReceiver(exampleFactory, loadedConfig) - require.NoError(t, err) - assert.NotNil(t, recvr) - exampleReceiver := recvr.(*config.ExampleReceiverProducer) - assert.Equal(t, dr.nextConsumer, exampleReceiver.MetricsConsumer) - }) +func TestSafeHost(t *testing.T) { + core, obs := zapObserver.New(zap.ErrorLevel) + host := &loggingHost{ + Host: componenttest.NewNopHost(), + logger: zap.New(core), + } + host.ReportFatalError(errors.New("runtime error")) + require.Equal(t, 1, obs.Len()) + log := obs.All()[0] + assert.Equal(t, "receiver reported a fatal error", log.Message) + assert.Equal(t, "runtime error", log.ContextMap()["error"]) } diff --git a/receiver/receivercreator/receivermap.go b/receiver/receivercreator/receivermap.go new file mode 100644 index 000000000000..5711dee21f88 --- /dev/null +++ b/receiver/receivercreator/receivermap.go @@ -0,0 +1,54 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivercreator + +import ( + "github.com/open-telemetry/opentelemetry-collector/component" +) + +// receiverMap is a multimap for mapping one id to many receivers. It does +// not deduplicate the same value being associated with the same key. +type receiverMap map[string][]component.Receiver + +// Put rcvr into key id. If rcvr is a duplicate it will still be added. +func (rm receiverMap) Put(id string, rcvr component.Receiver) { + rm[id] = append(rm[id], rcvr) +} + +// Get receivers by id. +func (rm receiverMap) Get(id string) []component.Receiver { + return rm[id] +} + +// Remove all receivers by id. +func (rm receiverMap) RemoveAll(id string) { + delete(rm, id) +} + +// Get all receivers in the map. +func (rm receiverMap) Values() (out []component.Receiver) { + for _, m := range rm { + out = append(out, m...) + } + return +} + +// Size is the number of total receivers in the map. +func (rm receiverMap) Size() (out int) { + for _, m := range rm { + out += len(m) + } + return +} diff --git a/receiver/receivercreator/receivermap_test.go b/receiver/receivercreator/receivermap_test.go new file mode 100644 index 000000000000..858779682f36 --- /dev/null +++ b/receiver/receivercreator/receivermap_test.go @@ -0,0 +1,58 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivercreator + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector/component" + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/stretchr/testify/assert" +) + +func TestReceiverMap(t *testing.T) { + rm := receiverMap{} + assert.Equal(t, 0, rm.Size()) + + r1 := &config.ExampleReceiverProducer{} + r2 := &config.ExampleReceiverProducer{} + r3 := &config.ExampleReceiverProducer{} + + rm.Put("a", r1) + assert.Equal(t, 1, rm.Size()) + + rm.Put("a", r2) + assert.Equal(t, 2, rm.Size()) + + rm.Put("b", r3) + assert.Equal(t, 3, rm.Size()) + + assert.Equal(t, []component.Receiver{r1, r2}, rm.Get("a")) + assert.Nil(t, rm.Get("missing")) + + rm.RemoveAll("missing") + assert.Equal(t, 3, rm.Size()) + + rm.RemoveAll("b") + assert.Equal(t, 2, rm.Size()) + + rm.RemoveAll("a") + assert.Equal(t, 0, rm.Size()) + + rm.Put("a", r1) + rm.Put("b", r2) + assert.Equal(t, 2, rm.Size()) + assert.Equal(t, []component.Receiver{r1, r2}, rm.Values()) +} diff --git a/receiver/receivercreator/rules.go b/receiver/receivercreator/rules.go new file mode 100644 index 000000000000..ffbcb1d9c69c --- /dev/null +++ b/receiver/receivercreator/rules.go @@ -0,0 +1,24 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivercreator + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +func ruleMatches(rule string, e observer.Endpoint) bool { + // TODO + return rule == "enabled" +} diff --git a/receiver/receivercreator/runner.go b/receiver/receivercreator/runner.go new file mode 100644 index 000000000000..615854f81424 --- /dev/null +++ b/receiver/receivercreator/runner.go @@ -0,0 +1,110 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivercreator + +import ( + "context" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector/component" + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/consumer" + "go.uber.org/zap" +) + +// runner starts and stops receiver instances. +type runner interface { + // start a receiver instance from its static config and discovered config. + start(receiver receiverConfig, discoveredConfig userConfigMap) (component.Receiver, error) + // shutdown a receiver. + shutdown(rcvr component.Receiver) error +} + +// receiverRunner handles starting/stopping of a concrete subreceiver instance. +type receiverRunner struct { + logger *zap.Logger + nextConsumer consumer.MetricsConsumerOld + idNamespace string + ctx context.Context + host component.Host +} + +var _ runner = (*receiverRunner)(nil) + +// start a receiver instance from its static config and discovered config. +func (run *receiverRunner) start(receiver receiverConfig, discoveredConfig userConfigMap) (component.Receiver, error) { + factory := run.host.GetFactory(component.KindReceiver, receiver.typeStr) + + if factory == nil { + return nil, fmt.Errorf("unable to lookup factory for receiver %q", receiver.typeStr) + } + + receiverFactory := factory.(component.ReceiverFactoryOld) + + cfg, err := run.loadRuntimeReceiverConfig(receiverFactory, receiver, discoveredConfig) + if err != nil { + return nil, err + } + recvr, err := run.createRuntimeReceiver(receiverFactory, cfg) + if err != nil { + return nil, err + } + + if err := recvr.Start(run.ctx, run.host); err != nil { + return nil, fmt.Errorf("failed starting receiver %s: %v", cfg.Name(), err) + } + + return recvr, nil +} + +// shutdown the given receiver. +func (run *receiverRunner) shutdown(rcvr component.Receiver) error { + return rcvr.Shutdown(run.ctx) +} + +// loadRuntimeReceiverConfig loads the given receiverTemplate merged with config values +// that may have been discovered at runtime. +func (run *receiverRunner) loadRuntimeReceiverConfig( + factory component.ReceiverFactoryOld, + receiver receiverConfig, + discoveredConfig userConfigMap, +) (configmodels.Receiver, error) { + mergedConfig := config.NewViper() + + // Merge in the config values specified in the config file. + if err := mergedConfig.MergeConfigMap(receiver.config); err != nil { + return nil, fmt.Errorf("failed to merge template config from config file: %v", err) + } + + // Merge in discoveredConfig containing values discovered at runtime. + if err := mergedConfig.MergeConfigMap(discoveredConfig); err != nil { + return nil, fmt.Errorf("failed to merge template config from discovered runtime values: %v", err) + } + + receiverConfig, err := config.LoadReceiver(mergedConfig, receiver.typeStr, receiver.fullName, factory) + if err != nil { + return nil, fmt.Errorf("failed to load template config: %v", err) + } + // Sets dynamically created receiver to something like receiver_creator/1/redis{endpoint="localhost:6380"}. + // TODO: Need to make sure this is unique (just endpoint is probably not totally sufficient). + receiverConfig.SetName(fmt.Sprintf("%s/%s{endpoint=%q}", run.idNamespace, receiver.fullName, mergedConfig.GetString(endpointConfigKey))) + return receiverConfig, nil +} + +// createRuntimeReceiver creates a receiver that is discovered at runtime. +func (run *receiverRunner) createRuntimeReceiver(factory component.ReceiverFactoryOld, cfg configmodels.Receiver) (component.MetricsReceiver, error) { + return factory.CreateMetricsReceiver(run.logger, cfg, run.nextConsumer) +} diff --git a/receiver/receivercreator/runner_test.go b/receiver/receivercreator/runner_test.go new file mode 100644 index 000000000000..f0046c453fbd --- /dev/null +++ b/receiver/receivercreator/runner_test.go @@ -0,0 +1,50 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receivercreator + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func Test_loadAndCreateRuntimeReceiver(t *testing.T) { + run := &receiverRunner{logger: zap.NewNop(), nextConsumer: &mockMetricsConsumer{}, idNamespace: "receiver_creator/1"} + exampleFactory := &config.ExampleReceiverFactory{} + template, err := newReceiverTemplate("examplereceiver/1", nil) + require.NoError(t, err) + + loadedConfig, err := run.loadRuntimeReceiverConfig(exampleFactory, template.receiverConfig, userConfigMap{ + endpointConfigKey: "localhost:12345", + }) + require.NoError(t, err) + assert.NotNil(t, loadedConfig) + exampleConfig := loadedConfig.(*config.ExampleReceiver) + // Verify that the overridden endpoint is used instead of the one in the config file. + assert.Equal(t, "localhost:12345", exampleConfig.Endpoint) + assert.Equal(t, "receiver_creator/1/examplereceiver/1{endpoint=\"localhost:12345\"}", exampleConfig.Name()) + + // Test that metric receiver can be created from loaded config. + t.Run("test create receiver from loaded config", func(t *testing.T) { + recvr, err := run.createRuntimeReceiver(exampleFactory, loadedConfig) + require.NoError(t, err) + assert.NotNil(t, recvr) + exampleReceiver := recvr.(*config.ExampleReceiverProducer) + assert.Equal(t, run.nextConsumer, exampleReceiver.MetricsConsumer) + }) +} diff --git a/receiver/receivercreator/testdata/config.yaml b/receiver/receivercreator/testdata/config.yaml index edb61c54030d..38c7a5a77554 100644 --- a/receiver/receivercreator/testdata/config.yaml +++ b/receiver/receivercreator/testdata/config.yaml @@ -2,7 +2,7 @@ receivers: receiver_creator: receiver_creator/1: examplereceiver/1: - rule: test rule + rule: enabled config: endpoint: localhost:12345