Skip to content

Commit

Permalink
receiver_creator: lookup configured observer extensions (#179)
Browse files Browse the repository at this point in the history
This utilizes `host.GetExtensions` to find observers that have been configured
using `watch_observers`. receiver_creator can be configured to watch zero or
more observers. To support this the receiver templates have been moved under
"receivers" key to be able to differentiate subreceiver config keys from config
keys for the receiver_creator itself.
  • Loading branch information
jrcamp authored and wyTrivail committed Jul 13, 2020
1 parent 23fe25c commit 4fac9d2
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 22 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.0.0-20200417175348-2f20d0c404f4 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver v0.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,8 @@ github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg=
github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200414190247-75ae9198a89e h1:w67eqMTZ7ggKmTJDV3+ertwhWpmPltreRMZ6+DRKk98=
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200414190247-75ae9198a89e/go.mod h1:aZAL+YwTtk+1YkTj8dDgTvv06dU8twzOdRowRtBfEKo=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.0.0-20200417175348-2f20d0c404f4 h1:PZFHe3iV5eO1ODxpFi7WYYprnsqe1sdK5GiuRRLOaBc=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.0.0-20200417175348-2f20d0c404f4/go.mod h1:0CKaqU1lHv0dWPhVSXDprfZ8vjBhxXJjXMUYIBU/0zQ=
github.com/open-telemetry/opentelemetry-proto v0.3.0 h1:+ASAtcayvoELyCF40+rdCMlBOhZIn5TPDez85zSYc30=
github.com/open-telemetry/opentelemetry-proto v0.3.0/go.mod h1:PMR5GI0F7BSpio+rBGFxNm6SLzg3FypDTcFuQZnO+F8=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
Expand Down
23 changes: 17 additions & 6 deletions receiver/receivercreator/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
# receiver_creator

This receiver can instantiate other receivers at runtime.
This receiver can instantiate other receivers at runtime based on whether observed endpoints match a configured rule.

TODO: More after it can react to observer events.
## TODO
* Observer endpoint details
* Rule matching

## Example
```yaml
extensions:
# Configures the Kubernetes observer to watch for pod start and stop events.
k8s_observer:

receivers:
receiver_creator/1:
examplereceiver/1:
rule: <TODO>
config:
endpoint: localhost:12345
# Name of the extensions to watch for endpoints to start and stop.
watch_observers: [k8s_observer]
receivers:
redis/1:
# If this rule matches an instance of this receiver will be started.
rule: port == 6379
config:
# Static receiver-specific config.
password: secret

processors:
exampleprocessor:
Expand Down
31 changes: 31 additions & 0 deletions receiver/receivercreator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,21 @@
package receivercreator

import (
"reflect"

otelconfig "github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/spf13/cast"
"github.com/spf13/viper"
)

const (
// receiversConfigKey is the config key name used to specify the subreceivers.
receiversConfigKey = "receivers"
// endpointConfigKey is the key name mapping to ReceiverSettings.Endpoint.
endpointConfigKey = "endpoint"
// configKey is the key name in a subreceiver.
configKey = "config"
)

// receiverConfig describes a receiver instance with a default config.
Expand Down Expand Up @@ -63,4 +76,22 @@ func newReceiverTemplate(name string, config userConfigMap) (receiverTemplate, e
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
receiverTemplates map[string]receiverTemplate
// WatchObservers are the extensions to listen to endpoints from.
WatchObservers []string `mapstructure:"watch_observers"`
}

// Copied from the Viper but changed to use the same delimiter.
// See https://github.com/spf13/viper/issues/871
func viperSub(v *viper.Viper, key string) *viper.Viper {
subv := otelconfig.NewViper()
data := v.Get(key)
if data == nil {
return subv
}

if reflect.TypeOf(data).Kind() == reflect.Map {
subv.MergeConfigMap(cast.ToStringMap(data))
return subv
}
return subv
}
10 changes: 8 additions & 2 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (

type mockHostFactories struct {
componenttest.NopHost
factories config.Factories
factories config.Factories
extensions map[configmodels.Extension]component.ServiceExtension
}

// GetFactory of the specified kind. Returns the factory for a component type.
Expand All @@ -46,7 +47,11 @@ func (mh *mockHostFactories) GetFactory(kind component.Kind, componentType strin
return nil
}

func exampleCreatorFactory(t *testing.T) (component.Host, *configmodels.Config) {
func (mh *mockHostFactories) GetExtensions() map[configmodels.Extension]component.ServiceExtension {
return mh.extensions
}

func exampleCreatorFactory(t *testing.T) (*mockHostFactories, *configmodels.Config) {
factories, err := config.ExampleComponents()
require.Nil(t, err)

Expand Down Expand Up @@ -80,4 +85,5 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, userConfigMap{
endpointConfigKey: "localhost:12345",
}, r1.receiverTemplates["examplereceiver/1"].config)
assert.Equal(t, []string{"mock_observer"}, r1.WatchObservers)
}
13 changes: 9 additions & 4 deletions receiver/receivercreator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,22 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
}
c := intoCfg.(*Config)

// TODO: Change the sub-receivers to be under "receivers" to allow other config for the main receiver-creator receiver.
for subreceiverKey := range sourceViperSection.AllSettings() {
cfgSection := sourceViperSection.GetStringMap(subreceiverKey + "::config")
if err := sourceViperSection.Unmarshal(&c); err != nil {
return err
}

receiversCfg := viperSub(sourceViperSection, receiversConfigKey)

for subreceiverKey := range receiversCfg.AllSettings() {
cfgSection := viperSub(receiversCfg, subreceiverKey).GetStringMap(configKey)
subreceiver, err := newReceiverTemplate(subreceiverKey, cfgSection)
if err != nil {
return err
}

// Unmarshals receiver_creator configuration like rule.
// TODO: validate discovery rule
if err := sourceViperSection.UnmarshalKey(subreceiverKey, &subreceiver); err != nil {
if err := receiversCfg.UnmarshalKey(subreceiverKey, &subreceiver); err != nil {
return fmt.Errorf("failed to deserialize sub-receiver %q: %s", subreceiverKey, err)
}

Expand Down
1 change: 1 addition & 0 deletions receiver/receivercreator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/census-instrumentation/opencensus-proto v0.2.1
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200414190247-75ae9198a89e
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.0.0
github.com/spf13/cast v1.3.1
github.com/spf13/viper v1.6.2
github.com/stretchr/testify v1.4.0
go.uber.org/zap v1.13.0
Expand Down
39 changes: 34 additions & 5 deletions receiver/receivercreator/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package receivercreator
import (
"context"
"errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
Expand All @@ -25,9 +26,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)

// endpointConfigKey is the key name mapping to ReceiverSettings.Endpoint.
const endpointConfigKey = "endpoint"

var (
errNilNextConsumer = errors.New("nil nextConsumer")
)
Expand All @@ -40,7 +38,6 @@ type receiverCreator struct {
logger *zap.Logger
cfg *Config
observerHandler observerHandler
observer observer.Observable
}

// newReceiverCreator creates the receiver_creator with the given parameters.
Expand Down Expand Up @@ -86,7 +83,39 @@ func (rc *receiverCreator) Start(ctx context.Context, host component.Host) error
host: &loggingHost{host, rc.logger},
}}

rc.observer.ListAndWatch(&rc.observerHandler)
observers := map[string]observer.Observable{}

// Match all configured observers to the extensions that are running.
for _, watchObserver := range rc.cfg.WatchObservers {
for cfg, ext := range host.GetExtensions() {
if cfg.Type() != watchObserver {
continue
}

obs, ok := ext.(observer.Observable)
if !ok {
return fmt.Errorf("extension %q in watch_observers is not an observer", watchObserver)
}
observers[watchObserver] = obs
}
}

// Make sure all observers are present before starting any.
for _, watchObserver := range rc.cfg.WatchObservers {
if observers[watchObserver] == nil {
return fmt.Errorf("failed to find observer %q in the extensions list", watchObserver)
}
}

if len(observers) == 0 {
rc.logger.Warn("no observers were configured and no subreceivers will be started. receiver_creator will be disabled",
zap.String("receiver", rc.cfg.Name()))
}

// Start all configured watchers.
for _, observable := range observers {
observable.ListAndWatch(&rc.observerHandler)
}

return nil
}
Expand Down
19 changes: 18 additions & 1 deletion receiver/receivercreator/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/component/componenttest"
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -60,6 +62,16 @@ func (p *mockMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consume
type mockObserver struct {
}

func (m *mockObserver) Start(ctx context.Context, host component.Host) error {
return nil
}

func (m *mockObserver) Shutdown(ctx context.Context) error {
return nil
}

var _ component.ServiceExtension = (*mockObserver)(nil)

func (m *mockObserver) ListAndWatch(notify observer.Notify) {
notify.OnAdd([]observer.Endpoint{observer.NewHostEndpoint("foobar", "169.168.1.100", nil)})
}
Expand All @@ -68,13 +80,18 @@ var _ observer.Observable = (*mockObserver)(nil)

func TestMockedEndToEnd(t *testing.T) {
host, cfg := exampleCreatorFactory(t)
host.extensions = map[configmodels.Extension]component.ServiceExtension{
&configmodels.ExtensionSettings{
TypeVal: "mock_observer",
NameVal: "mock_observer",
}: &mockObserver{},
}
dynCfg := cfg.Receivers["receiver_creator/1"]
factory := &Factory{}
mockConsumer := &mockMetricsConsumer{}
rcvr, err := factory.CreateMetricsReceiver(zap.NewNop(), dynCfg, mockConsumer)
require.NoError(t, err)
dyn := rcvr.(*receiverCreator)
dyn.observer = &mockObserver{}
require.NoError(t, rcvr.Start(context.Background(), host))

var shutdownOnce sync.Once
Expand Down
10 changes: 6 additions & 4 deletions receiver/receivercreator/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
receivers:
receiver_creator:
receiver_creator/1:
examplereceiver/1:
rule: enabled
config:
endpoint: localhost:12345
watch_observers: [mock_observer]
receivers:
examplereceiver/1:
rule: enabled
config:
endpoint: localhost:12345

processors:
exampleprocessor:
Expand Down

0 comments on commit 4fac9d2

Please sign in to comment.