Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add k8s observer #185

Merged
merged 14 commits into from
Apr 29, 2020
14 changes: 14 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
Expand All @@ -49,6 +50,19 @@ func components() (config.Factories, error) {
return config.Factories{}, err
}

extensions := []component.ExtensionFactory{
k8sobserver.NewFactory(),
}

for _, ext := range factories.Extensions {
extensions = append(extensions, ext)
}

factories.Extensions, err = component.MakeExtensionFactoryMap(extensions...)
if err != nil {
errs = append(errs, err)
}

receivers := []component.ReceiverFactoryBase{
&collectdreceiver.Factory{},
&sapmreceiver.Factory{},
Expand Down
1 change: 1 addition & 0 deletions extension/observer/k8sobserver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
36 changes: 36 additions & 0 deletions extension/observer/k8sobserver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sobserver

import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)

// Config defines configuration for k8s attributes processor.
type Config struct {
configmodels.ExtensionSettings `mapstructure:",squash"`

// Node should be set to the node name to limit discovered endpoints to. For example, node name can
// be set using the downward API inside the collector pod spec as follows:
//
// env:
// - name: K8S_NODE_NAME
// valueFrom:
// fieldRef:
// fieldPath: spec.nodeName
//
// Then set this value to ${K8S_NODE_NAME} in the configuration.
Node string `mapstructure:"node"`
}
16 changes: 16 additions & 0 deletions extension/observer/k8sobserver/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package k8sobserver implements a k8s observer extension for monitoring pods.
package k8sobserver
56 changes: 56 additions & 0 deletions extension/observer/k8sobserver/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sobserver

import (
"context"

"github.com/open-telemetry/opentelemetry-collector/component"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)

type k8sObserver struct {
logger *zap.Logger
informer cache.SharedInformer
stop chan struct{}
config *Config
}

func (k *k8sObserver) Start(ctx context.Context, host component.Host) error {
go k.informer.Run(k.stop)
return nil
}

func (k *k8sObserver) Shutdown(ctx context.Context) error {
close(k.stop)
return nil
}

var _ (component.ServiceExtension) = (*k8sObserver)(nil)

// ListAndWatch notifies watcher with the current state and sends subsequent state changes.
func (k *k8sObserver) ListAndWatch(listener observer.Notify) {
k.informer.AddEventHandler(&handler{watcher: listener, idNamespace: k.config.Name()})
}

// newObserver creates a new k8s observer extension.
func newObserver(logger *zap.Logger, config *Config, listWatch cache.ListerWatcher) (component.ServiceExtension, error) {
informer := cache.NewSharedInformer(listWatch, &v1.Pod{}, 0)
return &k8sObserver{logger: logger, informer: informer, stop: make(chan struct{}), config: config}, nil
}
87 changes: 87 additions & 0 deletions extension/observer/k8sobserver/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sobserver

import (
"context"
"testing"

"github.com/open-telemetry/opentelemetry-collector/component/componenttest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
framework "k8s.io/client-go/tools/cache/testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)

func TestNewExtension(t *testing.T) {
listWatch := framework.NewFakeControllerSource()
factory := &Factory{}
ext, err := newObserver(zap.NewNop(), factory.CreateDefaultConfig().(*Config), listWatch)
require.NoError(t, err)
require.NotNil(t, ext)
}

func TestExtensionObserve(t *testing.T) {
listWatch := framework.NewFakeControllerSource()
factory := &Factory{}
ext, err := newObserver(zap.NewNop(), factory.CreateDefaultConfig().(*Config), listWatch)
require.NoError(t, err)
require.NotNil(t, ext)
obs := ext.(*k8sObserver)

listWatch.Add(pod1V1)

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))

sink := &endpointSink{}
obs.ListAndWatch(sink)

assertSink(t, sink, func() bool {
return len(sink.added) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/pod1-UID",
Target: "1.2.3.4",
Details: observer.Pod{
Name: "pod1",
Labels: map[string]string{
"env": "prod",
},
},
}, sink.added[0])

listWatch.Delete(pod1V2)

assertSink(t, sink, func() bool {
return len(sink.removed) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/pod1-UID",
Target: "1.2.3.4",
Details: observer.Pod{
Name: "pod1",
Labels: map[string]string{
"env": "prod",
"pod-version": "2",
},
},
}, sink.removed[0])

require.NoError(t, ext.Shutdown(context.Background()))
}
98 changes: 98 additions & 0 deletions extension/observer/k8sobserver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sobserver

import (
"context"
"fmt"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

const (
// The value of extension "type" in configuration.
typeStr configmodels.Type = "k8s_observer"
)

// Factory is the factory for the extension.
type Factory struct {
createK8sConfig func() (*rest.Config, error)
}

var _ (component.Factory) = (*Factory)(nil)

// Type gets the type of the config created by this factory.
func (f *Factory) Type() configmodels.Type {
return typeStr
}

// CreateDefaultConfig creates the default configuration for the extension.
func (f *Factory) CreateDefaultConfig() configmodels.Extension {
return &Config{
ExtensionSettings: configmodels.ExtensionSettings{
TypeVal: typeStr,
NameVal: string(typeStr),
},
}
}

// CreateExtension creates the extension based on this config.
func (f *Factory) CreateExtension(
ctx context.Context,
params component.ExtensionCreateParams,
cfg configmodels.Extension,
) (component.ServiceExtension, error) {
config := cfg.(*Config)

restConfig, err := f.createK8sConfig()
if err != nil {
return nil, fmt.Errorf("failed creating Kubernetes in-cluster REST config: %v", err)
}

clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}

listWatch := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll,
fields.OneTermEqualSelector("spec.nodeName", config.Node))

return newObserver(params.Logger, config, listWatch)
}

// NewFactory should be called to create a factory with default values.
func NewFactory() component.ExtensionFactory {
return &Factory{createK8sConfig: func() (*rest.Config, error) {
restConfig, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return restConfig, nil
}}
}

// NewFactoryWithConfig creates a k8s observer factory with the given k8s API config.
func NewFactoryWithConfig(config *rest.Config) *Factory {
return &Factory{createK8sConfig: func() (*rest.Config, error) {
return config, nil
}}
}
63 changes: 63 additions & 0 deletions extension/observer/k8sobserver/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sobserver

import (
"context"
"testing"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"k8s.io/client-go/rest"
)

func TestFactory_Type(t *testing.T) {
factory := Factory{}
require.Equal(t, typeStr, factory.Type())
}

var nilConfig = func() (*rest.Config, error) {
return &rest.Config{}, nil
}

func TestFactory_CreateDefaultConfig(t *testing.T) {
factory := Factory{createK8sConfig: nilConfig}
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &Config{
ExtensionSettings: configmodels.ExtensionSettings{
TypeVal: typeStr,
NameVal: string(typeStr),
},
},
cfg)

assert.NoError(t, configcheck.ValidateConfig(cfg))
ext, err := factory.CreateExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)
}

func TestFactory_CreateExtension(t *testing.T) {
factory := Factory{createK8sConfig: nilConfig}
cfg := factory.CreateDefaultConfig().(*Config)

ext, err := factory.CreateExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)
}
Loading