Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve probe parsing #3250

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
move over processor config
  • Loading branch information
jaronoff97 committed Aug 7, 2024
commit b8c8fb16123e72195fdec204259e994c88a739f2
2 changes: 1 addition & 1 deletion internal/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type PortRetriever interface {

// RBACRuleGenerator is a function that generates a list of RBAC Rules given a configuration of type T
// It's expected that type T is the configuration used by a parser.
type RBACRuleGenerator[T any] func(logger logr.Logger, config interface{}) ([]rbacv1.PolicyRule, error)
type RBACRuleGenerator[T any] func(logger logr.Logger, config T) ([]rbacv1.PolicyRule, error)
type PortBuilderOption func(*corev1.ServicePort)

func WithTargetPort(targetPort int32) PortBuilderOption {
Expand Down
63 changes: 63 additions & 0 deletions internal/components/generic_config_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package components

import (
"fmt"

"github.com/go-logr/logr"
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
)

var (
_ Parser = &GenericParser[SingleEndpointConfig]{}
)

// GenericParser serves as scaffolding for custom parsing logic by isolating
// functionality to idempotent functions like RBACRuleGenerator
type GenericParser[T any] struct {
name string
config T
rbacGen RBACRuleGenerator[T]
}

func (g *GenericParser[T]) Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) {
return nil, nil
}

func (g *GenericParser[T]) GetRBACRules(logger logr.Logger, config interface{}) ([]rbacv1.PolicyRule, error) {
if g.rbacGen == nil {
return nil, nil
}
var parsed T
if err := mapstructure.Decode(config, &parsed); err != nil {
return nil, err
}
return g.rbacGen(logger, parsed)
}

func (g *GenericParser[T]) ParserType() string {
return ComponentType(g.name)
}

func (g *GenericParser[T]) ParserName() string {
return fmt.Sprintf("__%s", g.name)
}

func NewGenericParser[T any](name string, rbacGen RBACRuleGenerator[T]) *GenericParser[T] {
return &GenericParser[T]{name: name, rbacGen: rbacGen}
}
112 changes: 112 additions & 0 deletions internal/components/generic_config_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package components_test

import (
"fmt"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
rbacv1 "k8s.io/api/rbac/v1"

"github.com/open-telemetry/opentelemetry-operator/internal/components"
)

func TestGenericParser_GetRBACRules(t *testing.T) {
type args struct {
logger logr.Logger
config interface{}
}
type testCase[T any] struct {
name string
g *components.GenericParser[T]
args args
want []rbacv1.PolicyRule
wantErr assert.ErrorAssertionFunc
}

rbacGenFunc := func(logger logr.Logger, config components.SingleEndpointConfig) ([]rbacv1.PolicyRule, error) {
if config.Endpoint == "" && config.ListenAddress == "" {
return nil, fmt.Errorf("either endpoint or listen_address must be specified")
}
return []rbacv1.PolicyRule{
{
APIGroups: []string{""},
Resources: []string{"pods"},
Verbs: []string{"get", "list"},
},
}, nil
}

tests := []testCase[components.SingleEndpointConfig]{
{
name: "valid config with endpoint",
g: components.NewGenericParser[components.SingleEndpointConfig]("test", rbacGenFunc),
args: args{
logger: logr.Discard(),
config: map[string]interface{}{
"endpoint": "http://localhost:8080",
},
},
want: []rbacv1.PolicyRule{
{
APIGroups: []string{""},
Resources: []string{"pods"},
Verbs: []string{"get", "list"},
},
},
wantErr: assert.NoError,
},
{
name: "valid config with listen_address",
g: components.NewGenericParser[components.SingleEndpointConfig]("test", rbacGenFunc),
args: args{
logger: logr.Discard(),
config: map[string]interface{}{
"listen_address": "0.0.0.0:9090",
},
},
want: []rbacv1.PolicyRule{
{
APIGroups: []string{""},
Resources: []string{"pods"},
Verbs: []string{"get", "list"},
},
},
wantErr: assert.NoError,
},
{
name: "invalid config with no endpoint or listen_address",
g: components.NewGenericParser[components.SingleEndpointConfig]("test", rbacGenFunc),
args: args{
logger: logr.Discard(),
config: map[string]interface{}{},
},
want: nil,
wantErr: assert.Error,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.g.GetRBACRules(tt.args.logger, tt.args.config)
if !tt.wantErr(t, err, fmt.Sprintf("GetRBACRules(%v, %v)", tt.args.logger, tt.args.config)) {
return
}
assert.Equalf(t, tt.want, got, "GetRBACRules(%v, %v)", tt.args.logger, tt.args.config)
})
}
}
3 changes: 1 addition & 2 deletions internal/components/nop_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ var (
_ Parser = &NopParser{}
)

// SingleEndpointParser is a special parser for a generic receiver that has an endpoint or listen_address in its
// configuration. It doesn't self-register and should be created/used directly.
// NopParser is a minimal processor mostly used for testing or coverage.
type NopParser struct {
name string
}
Expand Down
50 changes: 50 additions & 0 deletions internal/components/processors/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processors

import "github.com/open-telemetry/opentelemetry-operator/internal/components"

// registry holds a record of all known receiver parsers.
var registry = make(map[string]components.Parser)

// Register adds a new parser builder to the list of known builders.
func Register(name string, p components.Parser) {
registry[name] = p
}

// IsRegistered checks whether a parser is registered with the given name.
func IsRegistered(name string) bool {
_, ok := registry[components.ComponentType(name)]
return ok
}

// ProcessorFor returns a parser builder for the given exporter name.
func ProcessorFor(name string) components.Parser {
if parser, ok := registry[components.ComponentType(name)]; ok {
return parser
}
return components.NewSilentSinglePortParser(components.ComponentType(name), components.UnsetPort, nil)
}

var componentParsers = []components.Parser{
components.NewGenericParser[K8sAttributeConfig]("k8sattributes", GenerateK8SAttrRbacRules),
components.NewGenericParser[ResourceDetectionConfig]("resourcedetection", GenerateResourceDetectionRbacRules),
}

func init() {
for _, parser := range componentParsers {
Register(parser.ParserType(), parser)
}
}
58 changes: 58 additions & 0 deletions internal/components/processors/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processors_test

import (
"testing"

"github.com/stretchr/testify/assert"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/internal/components/processors"
)

var logger = logf.Log.WithName("unit-tests")

func TestDownstreamParsers(t *testing.T) {
for _, tt := range []struct {
desc string
processorName string
parserName string
}{
{"k8sattributes", "k8sattributes", "__k8sattributes"},
{"opencensus", "opencensus", "__opencensus"},
} {
t.Run(tt.processorName, func(t *testing.T) {
t.Run("builds successfully", func(t *testing.T) {
// test
parser := processors.ProcessorFor(tt.processorName)

// verify
assert.Equal(t, tt.parserName, parser.ParserName())
})
t.Run("bad config errors", func(t *testing.T) {
// prepare
parser := processors.ProcessorFor(tt.processorName)

// test throwing in pure junk
_, err := parser.Ports(logger, tt.processorName, func() {})

// verify
assert.ErrorContains(t, err, "expected a map, got 'func'")
})

})
}
}
81 changes: 81 additions & 0 deletions internal/components/processors/k8sattribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processors

import (
"fmt"
"strings"

"github.com/go-logr/logr"
rbacv1 "k8s.io/api/rbac/v1"
)

type FieldExtractConfig struct {
TagName string `mapstructure:"tag_name"`
Key string `mapstructure:"key"`
KeyRegex string `mapstructure:"key_regex"`
Regex string `mapstructure:"regex"`
From string `mapstructure:"from"`
}

type Extract struct {
Metadata []string `mapstructure:"metadata"`
Labels []FieldExtractConfig `mapstructure:"labels"`
Annotations []FieldExtractConfig `mapstructure:"annotations"`
}

// K8sAttributeConfig is a minimal struct needed for parsing a valid k8sattribute processor configuration
// This only contains the fields necessary for parsing, other fields can be added in the future.
type K8sAttributeConfig struct {
Extract Extract `mapstructure:"extract"`
}

func GenerateK8SAttrRbacRules(_ logr.Logger, config K8sAttributeConfig) ([]rbacv1.PolicyRule, error) {
// These policies need to be added always
var prs = []rbacv1.PolicyRule{
{
APIGroups: []string{""},
Resources: []string{"pods", "namespaces"},
Verbs: []string{"get", "watch", "list"},
},
}

replicasetPolicy := rbacv1.PolicyRule{
APIGroups: []string{"apps"},
Resources: []string{"replicasets"},
Verbs: []string{"get", "watch", "list"},
}

if len(config.Extract.Metadata) == 0 {
prs = append(prs, replicasetPolicy)
}
addedReplicasetPolicy := false
for _, m := range config.Extract.Metadata {
metadataField := fmt.Sprint(m)
if (metadataField == "k8s.deployment.uid" || metadataField == "k8s.deployment.name") && !addedReplicasetPolicy {
prs = append(prs, replicasetPolicy)
addedReplicasetPolicy = true
} else if strings.Contains(metadataField, "k8s.node") {
prs = append(prs,
rbacv1.PolicyRule{
APIGroups: []string{""},
Resources: []string{"nodes"},
Verbs: []string{"get", "watch", "list"},
},
)
}
}
return prs, nil
}
Loading