Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.gnmi): Add yang-model decoding of JSON IETF payloads #15201

Merged
merged 11 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ following works:
- github.com/olivere/elastic [MIT License](https://github.com/olivere/elastic/blob/release-branch.v7/LICENSE)
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil [Apache License 2.0](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/LICENSE)
- github.com/openconfig/gnmi [Apache License 2.0](https://github.com/openconfig/gnmi/blob/master/LICENSE)
- github.com/openconfig/goyang [Apache License 2.0](https://github.com/openconfig/goyang/blob/master/LICENSE)
- github.com/opencontainers/go-digest [Apache License 2.0](https://github.com/opencontainers/go-digest/blob/master/LICENSE)
- github.com/opencontainers/image-spec [Apache License 2.0](https://github.com/opencontainers/image-spec/blob/master/LICENSE)
- github.com/opensearch-project/opensearch-go [Apache License 2.0](https://github.com/opensearch-project/opensearch-go/blob/main/LICENSE.txt)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ require (
github.com/nwaples/tacplus v0.0.3
github.com/olivere/elastic v6.2.37+incompatible
github.com/openconfig/gnmi v0.10.0
github.com/openconfig/goyang v1.0.0
github.com/opensearch-project/opensearch-go/v2 v2.3.0
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b
github.com/openzipkin-contrib/zipkin-go-opentracing v0.5.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1950,6 +1950,7 @@ github.com/openconfig/gnmi v0.10.0 h1:kQEZ/9ek3Vp2Y5IVuV2L/ba8/77TgjdXg505QXvYmg
github.com/openconfig/gnmi v0.10.0/go.mod h1:Y9os75GmSkhHw2wX8sMsxfI7qRGAEcDh8NTa5a8vj6E=
github.com/openconfig/goyang v0.0.0-20200115183954-d0a48929f0ea/go.mod h1:dhXaV0JgHJzdrHi2l+w0fZrwArtXL7jEFoiqLEdmkvU=
github.com/openconfig/goyang v0.2.2/go.mod h1:vX61x01Q46AzbZUzG617vWqh/cB+aisc+RrNkXRd3W8=
github.com/openconfig/goyang v1.0.0 h1:nYaFu7BOAk/eQn4CgAUjgYPfp3J6CdXrBryp32E5CjI=
github.com/openconfig/goyang v1.0.0/go.mod h1:vX61x01Q46AzbZUzG617vWqh/cB+aisc+RrNkXRd3W8=
github.com/openconfig/gribi v0.1.1-0.20210423184541-ce37eb4ba92f/go.mod h1:OoH46A2kV42cIXGyviYmAlGmn6cHjGduyC2+I9d/iVs=
github.com/openconfig/grpctunnel v0.0.0-20210610163803-fde4a9dc048d/go.mod h1:x9tAZ4EwqCQ0jI8D6S8Yhw9Z0ee7/BxWQX0k0Uib5Q8=
Expand Down
263 changes: 263 additions & 0 deletions plugins/common/yangmodel/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package yangmodel

import (
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"strconv"

"github.com/openconfig/goyang/pkg/yang"
)

var (
ErrInsufficientData = errors.New("insufficient data")
ErrNotFound = errors.New("no such node")
)

type Decoder struct {
modules map[string]*yang.Module
rootNodes map[string][]yang.Node
}

func NewDecoder(paths ...string) (*Decoder, error) {
modules := yang.NewModules()
modules.ParseOptions.IgnoreSubmoduleCircularDependencies = true

var moduleFiles []string
modulePaths := paths
unresolved := paths
for {
var newlyfound []string
for _, path := range unresolved {
entries, err := os.ReadDir(path)
if err != nil {
return nil, fmt.Errorf("reading directory %q failed: %w", path, err)
}
for _, entry := range entries {
info, err := entry.Info()
if err != nil {
fmt.Printf("Couldn't get info for %q: %v", entry.Name(), err)
continue
}

if info.Mode()&os.ModeSymlink != 0 {
target, err := filepath.EvalSymlinks(entry.Name())
if err != nil {
fmt.Printf("Couldn't evaluate symbolic links for %q: %v", entry.Name(), err)
continue
}
info, err = os.Lstat(target)
if err != nil {
fmt.Printf("Couldn't stat target %v: %v", target, err)
continue
}
}

newPath := filepath.Join(path, info.Name())
if info.IsDir() {
newlyfound = append(newlyfound, newPath)
continue
}
if info.Mode().IsRegular() && filepath.Ext(info.Name()) == ".yang" {
moduleFiles = append(moduleFiles, info.Name())
}
}
}
if len(newlyfound) == 0 {
break
}

modulePaths = append(modulePaths, newlyfound...)
unresolved = newlyfound
}

// Add the module paths
modules.AddPath(modulePaths...)
for _, fn := range moduleFiles {
if err := modules.Read(fn); err != nil {
fmt.Printf("reading file %q failed: %v\n", fn, err)
}
}
if errs := modules.Process(); len(errs) > 0 {
return nil, errors.Join(errs...)
}

// Get all root nodes defined in models with their origin. We require
// those nodes to later resolve paths to YANG model leaf nodes...
moduleLUT := make(map[string]*yang.Module)
moduleRootNodes := make(map[string][]yang.Node)
for _, m := range modules.Modules {
// Check if we processed the module already
if _, found := moduleLUT[m.Name]; found {
continue
}
// Create a module mapping for easily finding modules by name
moduleLUT[m.Name] = m

// Determine the origin defined in the module
var prefix string
for _, imp := range m.Import {
if imp.Name == "openconfig-extensions" {
prefix = imp.Name
if imp.Prefix != nil {
prefix = imp.Prefix.Name
}
break
}
}

var moduleOrigin string
if prefix != "" {
for _, e := range m.Extensions {
if e.Keyword == prefix+":origin" || e.Keyword == "origin" {
moduleOrigin = e.Argument
break
}
}
}
for _, u := range m.Uses {
root, err := yang.FindNode(m, u.Name)
if err != nil {
return nil, err
}
moduleRootNodes[moduleOrigin] = append(moduleRootNodes[moduleOrigin], root)
}
}

return &Decoder{modules: moduleLUT, rootNodes: moduleRootNodes}, nil
}

func (d *Decoder) FindLeaf(name, identifier string) (*yang.Leaf, error) {
// Get module name from the element
module, found := d.modules[name]
if !found {
return nil, fmt.Errorf("cannot find module %q", name)
}

for _, grp := range module.Grouping {
for _, leaf := range grp.Leaf {
if leaf.Name == identifier {
return leaf, nil
}
}
}
return nil, ErrNotFound
}

func DecodeLeafValue(leaf *yang.Leaf, value interface{}) (interface{}, error) {
schema := leaf.Type.YangType

// Ignore all non-string values as the types seem already converted...
s, ok := value.(string)
if !ok {
return value, nil
}

switch schema.Kind {
case yang.Ybinary:
// Binary values are encodes as base64 string, so decode the string
raw, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return value, err
}

switch schema.Name {
case "ieeefloat32":
if len(raw) != 4 {
return raw, fmt.Errorf("%w, expected 4 but got %d bytes", ErrInsufficientData, len(raw))
}
return math.Float32frombits(binary.BigEndian.Uint32(raw)), nil
default:
return raw, nil
}
case yang.Yint8:
v, err := strconv.ParseInt(s, 10, 8)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return int8(v), nil
case yang.Yint16:
v, err := strconv.ParseInt(s, 10, 16)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return int16(v), nil
case yang.Yint32:
v, err := strconv.ParseInt(s, 10, 32)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return int32(v), nil
case yang.Yint64:
v, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return v, nil
case yang.Yuint8:
v, err := strconv.ParseUint(s, 10, 8)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return uint8(v), nil
case yang.Yuint16:
v, err := strconv.ParseUint(s, 10, 16)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return uint16(v), nil
case yang.Yuint32:
v, err := strconv.ParseUint(s, 10, 32)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return uint32(v), nil
case yang.Yuint64:
v, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return v, nil
case yang.Ydecimal64:
v, err := strconv.ParseFloat(s, 64)
if err != nil {
return value, fmt.Errorf("parsing %s %q failed: %w", yang.TypeKindToName[schema.Kind], s, err)
}
return v, nil
}
return value, nil
}

func (d *Decoder) DecodeLeafElement(namespace, identifier string, value interface{}) (interface{}, error) {
leaf, err := d.FindLeaf(namespace, identifier)
if err != nil {
return nil, fmt.Errorf("finding %s failed: %w", identifier, err)
}

return DecodeLeafValue(leaf, value)
}

func (d *Decoder) DecodePathElement(origin, path string, value interface{}) (interface{}, error) {
rootNodes, found := d.rootNodes[origin]
if !found || len(rootNodes) == 0 {
return value, nil
}

for _, root := range rootNodes {
node, _ := yang.FindNode(root, path)
if node == nil {
// The path does not exist in this root node
continue
}
// We do expect a leaf node...
if leaf, ok := node.(*yang.Leaf); ok {
return DecodeLeafValue(leaf, value)
}
}

return value, nil
}
5 changes: 5 additions & 0 deletions plugins/inputs/gnmi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ details on how to use them.
## adds component, component_id & sub_component_id as additional tags
# vendor_specific = []

## YANG model paths for decoding IETF JSON payloads
## Model files are loaded recursively from the given directories. Disabled if
## no models are specified.
# yang_model_paths = []

## Define additional aliases to map encoding paths to measurement names
# [inputs.gnmi.aliases]
# ifcounters = "openconfig:/interfaces/interface/state/counters"
Expand Down
13 changes: 13 additions & 0 deletions plugins/inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/common/yangmodel"
"github.com/influxdata/telegraf/plugins/inputs"
)

Expand Down Expand Up @@ -62,11 +63,13 @@ type GNMI struct {
EnableTLS bool `toml:"enable_tls" deprecated:"1.27.0;use 'tls_enable' instead"`
KeepaliveTime config.Duration `toml:"keepalive_time"`
KeepaliveTimeout config.Duration `toml:"keepalive_timeout"`
YangModelPaths []string `toml:"yang_model_paths"`
Log telegraf.Logger `toml:"-"`
internaltls.ClientConfig

// Internal state
internalAliases map[*pathInfo]string
decoder *yangmodel.Decoder
cancel context.CancelFunc
wg sync.WaitGroup
}
Expand Down Expand Up @@ -219,6 +222,15 @@ func (c *GNMI) Init() error {
return err
}

// Load the YANG models if specified by the user
if len(c.YangModelPaths) > 0 {
decoder, err := yangmodel.NewDecoder(c.YangModelPaths...)
if err != nil {
return fmt.Errorf("creating YANG model decoder failed: %w", err)
}
c.decoder = decoder
}

return nil
}

Expand Down Expand Up @@ -275,6 +287,7 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
trimSlash: c.TrimFieldNames,
tagPathPrefix: c.PrefixTagKeyWithPath,
guessPathStrategy: c.GuessPathStrategy,
decoder: c.decoder,
log: c.Log,
ClientParameters: keepalive.ClientParameters{
Time: time.Duration(c.KeepaliveTime),
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/gnmi/gnmi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ func TestCases(t *testing.T) {
require.Eventually(t,
func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, 1*time.Second, 100*time.Millisecond)
}, 15*time.Second, 100*time.Millisecond)
plugin.Stop()
grpcServer.Stop()
wg.Wait()
Expand Down
8 changes: 7 additions & 1 deletion plugins/inputs/gnmi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/yangmodel"
jnprHeader "github.com/influxdata/telegraf/plugins/inputs/gnmi/extensions/jnpr_gnmi_extention"
"github.com/influxdata/telegraf/selfstat"
)
Expand All @@ -44,6 +45,7 @@ type handler struct {
trimSlash bool
tagPathPrefix bool
guessPathStrategy string
decoder *yangmodel.Decoder
log telegraf.Logger
keepalive.ClientParameters
}
Expand Down Expand Up @@ -172,7 +174,11 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
var valueFields []updateField
for _, update := range response.Update.Update {
fullPath := prefix.append(update.Path)
fields, err := newFieldsFromUpdate(fullPath, update)
if update.Path.Origin != "" {
fullPath.origin = update.Path.Origin
}

fields, err := h.newFieldsFromUpdate(fullPath, update)
if err != nil {
h.log.Errorf("Processing update %v failed: %v", update, err)
}
Expand Down
8 changes: 8 additions & 0 deletions plugins/inputs/gnmi/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ func (pi *pathInfo) String() string {
return out
}

func (pi *pathInfo) Path() (origin, path string) {
if len(pi.segments) == 0 {
return pi.origin, "/"
}

return pi.origin, "/" + strings.Join(pi.segments, "/")
}

func (pi *pathInfo) Tags(pathPrefix bool) map[string]string {
tags := make(map[string]string, len(pi.keyValues))
for _, s := range pi.keyValues {
Expand Down
Loading
Loading