Skip to content

Commit

Permalink
feat(inputs.gnmi): Add yang-model decoding of JSON IETF payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Apr 22, 2024
1 parent 2de8026 commit e0224be
Show file tree
Hide file tree
Showing 254 changed files with 98,628 additions and 25 deletions.
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ following works:
- github.com/olivere/elastic [MIT License](https://github.com/olivere/elastic/blob/release-branch.v7/LICENSE)
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil [Apache License 2.0](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/LICENSE)
- github.com/openconfig/gnmi [Apache License 2.0](https://github.com/openconfig/gnmi/blob/master/LICENSE)
- github.com/openconfig/goyang [Apache License 2.0](https://github.com/openconfig/goyang/blob/master/LICENSE)
- github.com/opencontainers/go-digest [Apache License 2.0](https://github.com/opencontainers/go-digest/blob/master/LICENSE)
- github.com/opencontainers/image-spec [Apache License 2.0](https://github.com/opencontainers/image-spec/blob/master/LICENSE)
- github.com/opensearch-project/opensearch-go [Apache License 2.0](https://github.com/opensearch-project/opensearch-go/blob/main/LICENSE.txt)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ require (
github.com/nwaples/tacplus v0.0.3
github.com/olivere/elastic v6.2.37+incompatible
github.com/openconfig/gnmi v0.10.0
github.com/openconfig/goyang v1.0.0
github.com/opensearch-project/opensearch-go/v2 v2.3.0
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b
github.com/openzipkin-contrib/zipkin-go-opentracing v0.5.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1944,6 +1944,7 @@ github.com/openconfig/gnmi v0.10.0 h1:kQEZ/9ek3Vp2Y5IVuV2L/ba8/77TgjdXg505QXvYmg
github.com/openconfig/gnmi v0.10.0/go.mod h1:Y9os75GmSkhHw2wX8sMsxfI7qRGAEcDh8NTa5a8vj6E=
github.com/openconfig/goyang v0.0.0-20200115183954-d0a48929f0ea/go.mod h1:dhXaV0JgHJzdrHi2l+w0fZrwArtXL7jEFoiqLEdmkvU=
github.com/openconfig/goyang v0.2.2/go.mod h1:vX61x01Q46AzbZUzG617vWqh/cB+aisc+RrNkXRd3W8=
github.com/openconfig/goyang v1.0.0 h1:nYaFu7BOAk/eQn4CgAUjgYPfp3J6CdXrBryp32E5CjI=
github.com/openconfig/goyang v1.0.0/go.mod h1:vX61x01Q46AzbZUzG617vWqh/cB+aisc+RrNkXRd3W8=
github.com/openconfig/gribi v0.1.1-0.20210423184541-ce37eb4ba92f/go.mod h1:OoH46A2kV42cIXGyviYmAlGmn6cHjGduyC2+I9d/iVs=
github.com/openconfig/grpctunnel v0.0.0-20210610163803-fde4a9dc048d/go.mod h1:x9tAZ4EwqCQ0jI8D6S8Yhw9Z0ee7/BxWQX0k0Uib5Q8=
Expand Down
149 changes: 149 additions & 0 deletions plugins/common/yangmodel/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package yangmodel

import (
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"math"
"os"
"path/filepath"

"github.com/openconfig/goyang/pkg/yang"
)

var (
ErrInsufficientData = errors.New("insufficient data")
ErrNotFound = errors.New("no such node")
)

type Decoder struct {
modules *yang.Modules
}

func NewDecoder(paths ...string) (*Decoder, error) {
modules := yang.NewModules()
modules.ParseOptions.IgnoreSubmoduleCircularDependencies = true

var moduleFiles []string
modulePaths := paths
unresolved := paths
for {
var newlyfound []string
for _, path := range unresolved {
entries, err := os.ReadDir(path)
if err != nil {
return nil, fmt.Errorf("reading directory %q failed: %w", path, err)
}
for _, entry := range entries {
info, err := entry.Info()
if err != nil {
fmt.Printf("Couldn't get info for %q: %v", entry.Name(), err)
continue
}

if info.Mode()&os.ModeSymlink != 0 {
target, err := filepath.EvalSymlinks(entry.Name())
if err != nil {
fmt.Printf("Couldn't evaluate symbolic links for %q: %v", entry.Name(), err)
continue
}
info, err = os.Lstat(target)
if err != nil {
fmt.Printf("Couldn't stat target %v: %v", target, err)
continue
}
}

newPath := filepath.Join(path, info.Name())
if info.IsDir() {
newlyfound = append(newlyfound, newPath)
continue
}
if info.Mode().IsRegular() && filepath.Ext(info.Name()) == ".yang" {
moduleFiles = append(moduleFiles, info.Name())
}
}
}
if len(newlyfound) == 0 {
break
}

modulePaths = append(modulePaths, newlyfound...)
unresolved = newlyfound
}

// Add the module paths
modules.AddPath(modulePaths...)
for _, fn := range moduleFiles {
if err := modules.Read(fn); err != nil {
fmt.Printf("reading file %q failed: %v\n", fn, err)
}
}
if errs := modules.Process(); len(errs) > 0 {
return nil, errors.Join(errs...)
}

return &Decoder{modules: modules}, nil
}

func (d *Decoder) FindLeaf(namespace, identifier string) (*yang.Leaf, error) {
// Get module name from the element
entry, errs := d.modules.GetModule(namespace)
if len(errs) > 0 {
return nil, fmt.Errorf("getting module %q failed: %w", namespace, errors.Join(errs...))
}

module, err := d.modules.FindModuleByNamespace(entry.Namespace().NName())
if err != nil {
return nil, fmt.Errorf("finding module %q failed: %w", namespace, err)
}

for _, grp := range module.Grouping {
for _, leaf := range grp.Leaf {
if leaf.Name == identifier {
return leaf, nil
}
}
}
return nil, ErrNotFound
}

func DecodeLeaf(leaf *yang.Leaf, value interface{}) (interface{}, error) {
schema := leaf.Type.YangType

if schema.Kind != yang.Ybinary {
return value, nil
}

// Binary values are encodes as base64 strings
s, ok := value.(string)
if !ok {
return value, nil
}

// Decode the encoded binary values
raw, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return value, err
}

switch schema.Name {
case "ieeefloat32":
if len(raw) != 4 {
return raw, fmt.Errorf("%w, expected 4 but got %d bytes", ErrInsufficientData, len(raw))
}
return math.Float32frombits(binary.BigEndian.Uint32(raw)), nil
default:
return raw, nil
}
}

func (d *Decoder) DecodeElement(namespace, identifier string, value interface{}) (interface{}, error) {
leaf, err := d.FindLeaf(namespace, identifier)
if err != nil {
return nil, fmt.Errorf("finding %s failed: %w", identifier, err)
}

return DecodeLeaf(leaf, value)
}
5 changes: 5 additions & 0 deletions plugins/inputs/gnmi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ details on how to use them.
## adds component, component_id & sub_component_id as additional tags
# vendor_specific = []

## YANG model paths for decoding IETF JSON payloads
## Model files are loaded recursively from the given directories. Disabled if
## no models are specified.
# yang_model_paths = []

## Define additional aliases to map encoding paths to measurement names
# [inputs.gnmi.aliases]
# ifcounters = "openconfig:/interfaces/interface/state/counters"
Expand Down
12 changes: 12 additions & 0 deletions plugins/inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/common/yangmodel"
"github.com/influxdata/telegraf/plugins/inputs"
)

Expand Down Expand Up @@ -60,11 +61,13 @@ type GNMI struct {
EnableTLS bool `toml:"enable_tls" deprecated:"1.27.0;use 'tls_enable' instead"`
KeepaliveTime config.Duration `toml:"keepalive_time"`
KeepaliveTimeout config.Duration `toml:"keepalive_timeout"`
YangModelPaths []string `toml:"yang_model_paths"`
Log telegraf.Logger `toml:"-"`
internaltls.ClientConfig

// Internal state
internalAliases map[*pathInfo]string
decoder *yangmodel.Decoder
cancel context.CancelFunc
wg sync.WaitGroup
}
Expand Down Expand Up @@ -195,6 +198,14 @@ func (c *GNMI) Init() error {
}
c.Log.Debugf("Internal alias mapping: %+v", c.internalAliases)

if len(c.YangModelPaths) > 0 {
decoder, err := yangmodel.NewDecoder(c.YangModelPaths...)
if err != nil {
return fmt.Errorf("creating YANG model decoder failed: %w", err)
}
c.decoder = decoder
}

return nil
}

Expand Down Expand Up @@ -250,6 +261,7 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
canonicalFieldNames: c.CanonicalFieldNames,
trimSlash: c.TrimFieldNames,
guessPathStrategy: c.GuessPathStrategy,
decoder: c.decoder,
log: c.Log,
ClientParameters: keepalive.ClientParameters{
Time: time.Duration(c.KeepaliveTime),
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/gnmi/gnmi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ func TestCases(t *testing.T) {
require.Eventually(t,
func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, 1*time.Second, 100*time.Millisecond)
}, 5*time.Second, 100*time.Millisecond)
plugin.Stop()
grpcServer.Stop()
wg.Wait()
Expand Down
4 changes: 3 additions & 1 deletion plugins/inputs/gnmi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/yangmodel"
jnprHeader "github.com/influxdata/telegraf/plugins/inputs/gnmi/extensions/jnpr_gnmi_extention"
"github.com/influxdata/telegraf/selfstat"
)
Expand All @@ -42,6 +43,7 @@ type handler struct {
canonicalFieldNames bool
trimSlash bool
guessPathStrategy string
decoder *yangmodel.Decoder
log telegraf.Logger
keepalive.ClientParameters
}
Expand Down Expand Up @@ -170,7 +172,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
var valueFields []updateField
for _, update := range response.Update.Update {
fullPath := prefix.append(update.Path)
fields, err := newFieldsFromUpdate(fullPath, update)
fields, err := h.newFieldsFromUpdate(fullPath, update)
if err != nil {
h.log.Errorf("Processing update %v failed: %v", update, err)
}
Expand Down
5 changes: 5 additions & 0 deletions plugins/inputs/gnmi/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
## adds component, component_id & sub_component_id as additional tags
# vendor_specific = []

## YANG model paths for decoding IETF JSON payloads
## Model files are loaded recursively from the given directories. Disabled if
## no models are specified.
# yang_model_paths = []

## Define additional aliases to map encoding paths to measurement names
# [inputs.gnmi.aliases]
# ifcounters = "openconfig:/interfaces/interface/state/counters"
Expand Down
2 changes: 2 additions & 0 deletions plugins/inputs/gnmi/testcases/issue_15046/expected.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
psu,name=PowerSupply1/A,path=openconfig:/components/component/power-supply/state,source=127.0.0.1 openconfig_platform_psu:capacity=715,openconfig_platform_psu:enabled=true,openconfig_platform_psu:input_current=0.47099998593330383,openconfig_platform_psu:input_voltage=208.5,openconfig_platform_psu:output_current=1.2029999494552612,openconfig_platform_psu:output_power=68.625,openconfig_platform_psu:output_voltage=56.367000579833984 1711178737105194000
psu,name=PowerSupply1/B,path=openconfig:/components/component/power-supply/state,source=127.0.0.1 openconfig_platform_psu:capacity=715,openconfig_platform_psu:enabled=true,openconfig_platform_psu:input_current=0.3930000066757202,openconfig_platform_psu:input_voltage=209.75,openconfig_platform_psu:output_current=0.9380000233650208,openconfig_platform_psu:output_power=51.875,openconfig_platform_psu:output_voltage=56.367000579833984 1711178737105194000
Loading

0 comments on commit e0224be

Please sign in to comment.