-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cisco MDT input: minor improvements and support for NX-OS telemetry extensions #6177
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,9 @@ import ( | |
"github.com/influxdata/telegraf/plugins/inputs" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/credentials" | ||
|
||
// Register GRPC gzip decoder to support compressed telemetry | ||
_ "google.golang.org/grpc/encoding/gzip" | ||
"google.golang.org/grpc/peer" | ||
) | ||
|
||
|
@@ -32,6 +35,7 @@ type CiscoTelemetryMDT struct { | |
// Common configuration | ||
Transport string | ||
ServiceAddress string `toml:"service_address"` | ||
DecodeNXOS bool `toml:"decode_nxos"` | ||
MaxMsgSize int `toml:"max_msg_size"` | ||
Aliases map[string]string `toml:"aliases"` | ||
|
||
|
@@ -76,6 +80,7 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error { | |
var opts []grpc.ServerOption | ||
tlsConfig, err := c.ServerConfig.TLSConfig() | ||
if err != nil { | ||
c.listener.Close() | ||
return err | ||
} else if tlsConfig != nil { | ||
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig))) | ||
|
@@ -198,6 +203,8 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS | |
log.Printf("D! [inputs.cisco_telemetry_mdt]: Accepted Cisco MDT GRPC dialout connection from %s", peer.Addr) | ||
} | ||
|
||
var chunkBuffer bytes.Buffer | ||
|
||
for { | ||
packet, err := stream.Recv() | ||
if err != nil { | ||
|
@@ -212,7 +219,18 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS | |
break | ||
} | ||
|
||
c.handleTelemetry(packet.Data) | ||
// Reassemble chunked telemetry data received from NX-OS | ||
if packet.TotalSize == 0 { | ||
c.handleTelemetry(packet.Data) | ||
} else if int(packet.TotalSize) <= c.MaxMsgSize { | ||
chunkBuffer.Write(packet.Data) | ||
if chunkBuffer.Len() >= int(packet.TotalSize) { | ||
c.handleTelemetry(chunkBuffer.Bytes()) | ||
chunkBuffer.Reset() | ||
} | ||
} else { | ||
c.acc.AddError(fmt.Errorf("dropped too large packet: %dB > %dB", packet.TotalSize, c.MaxMsgSize)) | ||
} | ||
} | ||
|
||
if peerOK { | ||
|
@@ -224,7 +242,6 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS | |
|
||
// Handle telemetry packet from any transport, decode and add as measurement | ||
func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { | ||
var namebuf bytes.Buffer | ||
telemetry := &telemetry.Telemetry{} | ||
err := proto.Unmarshal(data, telemetry) | ||
if err != nil { | ||
|
@@ -254,12 +271,12 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { | |
tags["source"] = telemetry.GetNodeIdStr() | ||
tags["subscription"] = telemetry.GetSubscriptionIdStr() | ||
for _, subfield := range field.Fields { | ||
c.parseGPBKVField(subfield, &namebuf, telemetry.EncodingPath, timestamp, tags, nil) | ||
c.parseGPBKVField(subfield, "", telemetry.EncodingPath, timestamp, tags, nil) | ||
} | ||
case "content": | ||
fields = make(map[string]interface{}, len(field.Fields)) | ||
for _, subfield := range field.Fields { | ||
c.parseGPBKVField(subfield, &namebuf, telemetry.EncodingPath, timestamp, tags, fields) | ||
c.parseGPBKVField(subfield, "", telemetry.EncodingPath, timestamp, tags, fields) | ||
} | ||
default: | ||
log.Printf("I! [inputs.cisco_telemetry_mdt]: Unexpected top-level MDT field: %s", field.Name) | ||
|
@@ -268,29 +285,36 @@ func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { | |
|
||
// Find best alias for encoding path and emit measurement | ||
if len(fields) > 0 && len(tags) > 0 && len(telemetry.EncodingPath) > 0 { | ||
name := telemetry.EncodingPath | ||
if alias, ok := c.aliases[name]; ok { | ||
tags["path"] = name | ||
name = alias | ||
} else { | ||
log.Printf("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s", name) | ||
} | ||
c.acc.AddFields(name, fields, tags, timestamp) | ||
} else { | ||
c.addFieldsWithAlias(telemetry.EncodingPath, fields, tags, timestamp) | ||
} else if !c.DecodeNXOS { | ||
c.acc.AddError(fmt.Errorf("empty encoding path or measurement")) | ||
} | ||
} | ||
} | ||
|
||
// Add fields doing alias replacement | ||
func (c *CiscoTelemetryMDT) addFieldsWithAlias(path string, fields map[string]interface{}, | ||
tags map[string]string, timestamp time.Time) { | ||
name := path | ||
if alias, ok := c.aliases[name]; ok { | ||
tags["path"] = name | ||
name = alias | ||
} else { | ||
log.Printf("D! [inputs.cisco_telemetry_mdt]: No measurement alias for encoding path: %s", name) | ||
} | ||
c.acc.AddFields(name, fields, tags, timestamp) | ||
} | ||
|
||
// Recursively parse GPBKV field structure into fields or tags | ||
func (c *CiscoTelemetryMDT) parseGPBKVField(field *telemetry.TelemetryField, namebuf *bytes.Buffer, | ||
func (c *CiscoTelemetryMDT) parseGPBKVField(field *telemetry.TelemetryField, prefix string, | ||
path string, timestamp time.Time, tags map[string]string, fields map[string]interface{}) { | ||
|
||
namelen := namebuf.Len() | ||
if namelen > 0 { | ||
namebuf.WriteRune('/') | ||
localname := strings.Replace(field.Name, "-", "_", -1) | ||
name := localname | ||
if len(name) == 0 { | ||
name = prefix | ||
} else if len(prefix) > 0 { | ||
name = prefix + "/" + localname | ||
} | ||
namebuf.WriteString(strings.Replace(field.Name, "-", "_", -1)) | ||
|
||
// Decode Telemetry field value if set | ||
var value interface{} | ||
|
@@ -318,21 +342,91 @@ func (c *CiscoTelemetryMDT) parseGPBKVField(field *telemetry.TelemetryField, nam | |
if value != nil { | ||
// Distinguish between tags (keys) and fields (data) to write to | ||
if fields != nil { | ||
fields[namebuf.String()] = value | ||
fields[name] = value | ||
} else { | ||
if _, exists := tags[field.Name]; !exists { // Use short keys whenever possible | ||
tags[field.Name] = fmt.Sprint(value) | ||
if _, exists := tags[localname]; !exists { // Use short keys whenever possible | ||
tags[localname] = fmt.Sprint(value) | ||
} else { | ||
tags[namebuf.String()] = fmt.Sprint(value) | ||
tags[name] = fmt.Sprint(value) | ||
} | ||
} | ||
} | ||
if fields == nil || !c.DecodeNXOS { | ||
for _, subfield := range field.Fields { | ||
c.parseGPBKVField(subfield, name, path, timestamp, tags, fields) | ||
} | ||
} else if c.DecodeNXOS && len(field.Fields) > 0 { // NX-OS extended decoding logic | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we inspect the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in latest update. |
||
c.parseNXOSTelemetryStructure(field, prefix, name, path, timestamp, tags, fields) | ||
} | ||
} | ||
|
||
// Parse extended structure of NX-OS platform telemetry | ||
func (c *CiscoTelemetryMDT) parseNXOSTelemetryStructure(field *telemetry.TelemetryField, prefix string, | ||
name string, path string, timestamp time.Time, tags map[string]string, fields map[string]interface{}) { | ||
var attributes, children, rows *telemetry.TelemetryField | ||
|
||
// NX-OS uses certain fieldnames to indicate the structure following | ||
for _, subfield := range field.Fields { | ||
c.parseGPBKVField(subfield, namebuf, path, timestamp, tags, fields) | ||
if subfield.Name == "attributes" && len(subfield.Fields) > 0 { | ||
attributes = subfield | ||
} else if subfield.Name == "children" && len(subfield.Fields) > 0 { | ||
children = subfield | ||
} else if strings.HasPrefix(subfield.Name, "ROW_") { | ||
rows = subfield | ||
} else { // Fallback to regular telemetry decoding | ||
c.parseGPBKVField(subfield, name, path, timestamp, tags, fields) | ||
} | ||
} | ||
|
||
namebuf.Truncate(namelen) | ||
if attributes != nil { | ||
// DME structure: https://developer.cisco.com/site/nxapi-dme-model-reference-api/ | ||
values := make(map[string]interface{}) | ||
for _, subfield := range attributes.Fields { | ||
c.parseGPBKVField(subfield, "", path, timestamp, tags, values) | ||
} | ||
if rn, hasRN := values["rn"]; hasRN { | ||
// Promote the relative name of the entry from a value to a key | ||
tags[prefix] = fmt.Sprint(rn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in latest update. |
||
delete(values, "rn") | ||
for key, value := range values { | ||
// Work around an issue where a field is returned of type string when empty | ||
// and as a number otherwise causing type confusion, thus remove empty strings | ||
if str, isStr := value.(string); isStr && len(str) == 0 { | ||
delete(values, key) | ||
} | ||
} | ||
c.addFieldsWithAlias(path+"/"+prefix, values, tags, timestamp) | ||
} else if _, hasDN := values["dn"]; !hasDN { // Check for distinguished name being present | ||
c.acc.AddError(fmt.Errorf("NX-OS decoding failed: missing dn field")) | ||
} | ||
if children != nil { | ||
// This is a nested structure, children will inherit relative name keys of parent | ||
for _, subfield := range children.Fields { | ||
c.parseGPBKVField(subfield, prefix, path, timestamp, tags, fields) | ||
} | ||
} | ||
delete(tags, prefix) | ||
} else if rows != nil { | ||
// NXAPI structure: https://developer.cisco.com/docs/cisco-nexus-9000-series-nx-api-cli-reference-release-9-2x/ | ||
for _, row := range rows.Fields { | ||
values := make(map[string]interface{}) | ||
for i, subfield := range row.Fields { | ||
c.parseGPBKVField(subfield, "", path, timestamp, tags, values) | ||
if i == 0 { // First subfield contains the index, promote it from value to tag | ||
tags[prefix] = fmt.Sprint(values[subfield.Name]) | ||
delete(values, subfield.Name) | ||
} | ||
} | ||
for key, value := range values { | ||
// Work around an issue where a field is returned of type string when empty | ||
// and as a number otherwise causing type confusion, thus remove empty strings | ||
if str, isStr := value.(string); isStr && len(str) == 0 { | ||
delete(values, key) | ||
} | ||
} | ||
c.addFieldsWithAlias(path+"/"+prefix, values, tags, timestamp) | ||
} | ||
} | ||
} | ||
|
||
// Stop listener and cleanup | ||
|
@@ -355,6 +449,9 @@ const sampleConfig = ` | |
## Address and port to host telemetry listener | ||
service_address = ":57000" | ||
|
||
## Enable support for decoding NX-OS platform-specific telemetry extensions (disable for IOS XR and IOS XE) | ||
# decode_nxos = true | ||
|
||
## Enable TLS; grpc transport only. | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a bytes.Buffer, why not just send the packet.Data into handleTelemetry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NX-OS can send data which is too large for a single GRPC message and if so uses "application layer" chunking where one telemetry message is reassembled from multiple GRPC messages.