Skip to content

Commit

Permalink
[pkg/ottl]: add Associate function
Browse files Browse the repository at this point in the history
Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
  • Loading branch information
bacherfl committed Sep 25, 2024
1 parent 5601dfb commit 02e3beb
Show file tree
Hide file tree
Showing 4 changed files with 403 additions and 0 deletions.
33 changes: 33 additions & 0 deletions pkg/ottl/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,28 @@ func Test_e2e_converters(t *testing.T) {
m.PutStr("user_agent.version", "7.81.0")
},
},
{
statement: `set(attributes["test"], Associate(attributes["things"], ["name"]))`,
want: func(tCtx ottllog.TransformContext) {
m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test")
thing1 := m.PutEmptyMap("foo")
thing1.PutStr("name", "foo")
thing1.PutInt("value", 2)

thing2 := m.PutEmptyMap("bar")
thing2.PutStr("name", "bar")
thing2.PutInt("value", 5)
},
},
{
statement: `set(attributes["test"], Associate(attributes["things"], ["name"], ["value"]))`,
want: func(tCtx ottllog.TransformContext) {
m := tCtx.GetLogRecord().Attributes().PutEmptyMap("test")
m.PutInt("foo", 2)
m.PutInt("bar", 5)

},
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -1036,6 +1058,17 @@ func constructLogTransformContext() ottllog.TransformContext {
m2 := m.PutEmptyMap("nested")
m2.PutStr("test", "pass")

s2 := logRecord.Attributes().PutEmptySlice("things")
o1 := s2.AppendEmpty()
thing1 := o1.SetEmptyMap()
thing1.PutStr("name", "foo")
thing1.PutInt("value", 2)

o2 := s2.AppendEmpty()
thing2 := o2.SetEmptyMap()
thing2.PutStr("name", "bar")
thing2.PutInt("value", 5)

return ottllog.NewTransformContext(logRecord, scope, resource, plog.NewScopeLogs(), plog.NewResourceLogs())
}

Expand Down
109 changes: 109 additions & 0 deletions pkg/ottl/ottlfuncs/func_associate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ottlfuncs // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs"
import (
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"go.opentelemetry.io/collector/pdata/pcommon"
"golang.org/x/net/context"
)

type AssociateArguments[K any] struct {
Target ottl.Getter[K]
KeyPath []string
ValuePath ottl.Optional[[]string]
}

func NewAssociateFactory[K any]() ottl.Factory[K] {
return ottl.NewFactory("Associate", &AssociateArguments[K]{}, createAssociateFunction[K])
}

func createAssociateFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[K], error) {
args, ok := oArgs.(*AssociateArguments[K])
if !ok {
return nil, fmt.Errorf("AssociateFactory args must be of type *AssociateArguments[K")
}

return Associate(args.Target, args.KeyPath, args.ValuePath)
}

func Associate[K any](target ottl.Getter[K], keyPath []string, valuePath ottl.Optional[[]string]) (ottl.ExprFunc[K], error) {
return func(ctx context.Context, tCtx K) (any, error) {
if len(keyPath) == 0 {
return nil, fmt.Errorf("key path must contain at least one element")
}
val, err := target.Get(ctx, tCtx)
if err != nil {
return nil, err
}

switch v := val.(type) {
case []any:
return associate(v, keyPath, valuePath)
case pcommon.Slice:
return associate(v.AsRaw(), keyPath, valuePath)
default:
return nil, fmt.Errorf("unsupported type provided to Associate function: %T", v)
}
}, nil
}

func associate(v []any, keyPath []string, valuePath ottl.Optional[[]string]) (any, error) {
result := make(map[string]any, len(v))
for _, elem := range v {
switch e := elem.(type) {
case map[string]any:
obj, err := extractValue(e, keyPath)
if err != nil {
return nil, fmt.Errorf("could not extract key value: %w", err)
}

var key string
switch k := obj.(type) {
case string:
key = k
default:
return nil, fmt.Errorf("provided key path %v does not resolve to a string", keyPath)
}

if valuePath.IsEmpty() {
result[key] = e
continue
}
obj, err = extractValue(e, valuePath.Get())
if err != nil {
return nil, fmt.Errorf("could not extract value: %w", err)
}
result[key] = obj
default:
return nil, fmt.Errorf("unsupported value type: %T", e)
}
}
m := pcommon.NewMap()
if err := m.FromRaw(result); err != nil {
return nil, fmt.Errorf("could not create pcommon.Map from result: %w", err)
}

return m, nil
}

func extractValue(v map[string]any, path []string) (any, error) {
if len(path) == 0 {
return nil, fmt.Errorf("must provide at least one path item")
}
obj, ok := v[path[0]]
if !ok {
return nil, fmt.Errorf("provided object does not contain the path %v", path)
}
if len(path) == 1 {
return obj, nil
}

switch o := obj.(type) {
case map[string]any:
return extractValue(o, path[1:])
default:
return nil, fmt.Errorf("provided object does not contain the path %v", path)
}
}
Loading

0 comments on commit 02e3beb

Please sign in to comment.