-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
handler.go
340 lines (304 loc) · 9.91 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
package gnmi
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sort"
"strconv"
"strings"
"time"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmi/proto/gnmi_ext"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/yangmodel"
"github.com/influxdata/telegraf/plugins/inputs/gnmi/extensions/jnpr_gnmi_extention"
"github.com/influxdata/telegraf/selfstat"
)
const eidJuniperTelemetryHeader = 1
type handler struct {
host string
port string
aliases map[*pathInfo]string
tagsubs []tagSubscription
maxMsgSize int
emptyNameWarnShown bool
vendorExt []string
tagStore *tagStore
trace bool
canonicalFieldNames bool
trimSlash bool
tagPathPrefix bool
guessPathStrategy string
decoder *yangmodel.Decoder
log telegraf.Logger
keepalive.ClientParameters
}
// SubscribeGNMI and extract telemetry data
func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, tlscfg *tls.Config, request *gnmi.SubscribeRequest) error {
var creds credentials.TransportCredentials
if tlscfg != nil {
creds = credentials.NewTLS(tlscfg)
} else {
creds = insecure.NewCredentials()
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}
if h.maxMsgSize > 0 {
opts = append(opts, grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(h.maxMsgSize),
))
}
if h.ClientParameters.Time > 0 {
opts = append(opts, grpc.WithKeepaliveParams(h.ClientParameters))
}
// Used to report the status of the TCP connection to the device. If the
// GNMI connection goes down, but TCP is still up this will still report
// connected until the TCP connection times out.
connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.host})
defer connectStat.Set(0)
address := net.JoinHostPort(h.host, h.port)
client, err := grpc.NewClient(address, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
defer client.Close()
subscribeClient, err := gnmi.NewGNMIClient(client).Subscribe(ctx)
if err != nil {
return fmt.Errorf("failed to setup subscription: %w", err)
}
// If io.EOF is returned, the stream may have ended and stream status
// can be determined by calling Recv.
if err := subscribeClient.Send(request); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send subscription request: %w", err)
}
connectStat.Set(1)
h.log.Debugf("Connection to gNMI device %s established", address)
defer h.log.Debugf("Connection to gNMI device %s closed", address)
for ctx.Err() == nil {
var reply *gnmi.SubscribeResponse
if reply, err = subscribeClient.Recv(); err != nil {
if !errors.Is(err, io.EOF) && ctx.Err() == nil {
return fmt.Errorf("aborted gNMI subscription: %w", err)
}
break
}
if h.trace {
buf, err := protojson.Marshal(reply)
if err != nil {
h.log.Debugf("Marshal failed: %v", err)
} else {
t := reply.GetUpdate().GetTimestamp()
h.log.Debugf("Got update_%v: %s", t, string(buf))
}
}
if response, ok := reply.Response.(*gnmi.SubscribeResponse_Update); ok {
h.handleSubscribeResponseUpdate(acc, response, reply.GetExtension())
}
}
return nil
}
// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, response *gnmi.SubscribeResponse_Update, extension []*gnmi_ext.Extension) {
grouper := metric.NewSeriesGrouper()
timestamp := time.Unix(0, response.Update.Timestamp)
// Extract tags from potential extension in the update notification
headerTags := make(map[string]string)
for _, ext := range extension {
currentExt := ext.GetRegisteredExt().Msg
if currentExt == nil {
break
}
switch ext.GetRegisteredExt().Id {
case eidJuniperTelemetryHeader:
// Juniper Header extension
// Decode it only if user requested it
if choice.Contains("juniper_header", h.vendorExt) {
juniperHeader := &jnpr_gnmi_extention.GnmiJuniperTelemetryHeaderExtension{}
if err := proto.Unmarshal(currentExt, juniperHeader); err != nil {
h.log.Errorf("unmarshal gnmi Juniper Header extension failed: %v", err)
} else {
// Add only relevant Tags from the Juniper Header extension.
// These are required for aggregation
headerTags["component_id"] = strconv.FormatUint(uint64(juniperHeader.GetComponentId()), 10)
headerTags["component"] = juniperHeader.GetComponent()
headerTags["sub_component_id"] = strconv.FormatUint(uint64(juniperHeader.GetSubComponentId()), 10)
}
}
default:
continue
}
}
// Extract the path part valid for the whole set of updates if any
prefix := newInfoFromPath(response.Update.Prefix)
// Add info to the tags
headerTags["source"] = h.host
if !prefix.empty() {
headerTags["path"] = prefix.fullPath()
}
// Process and remove tag-updates from the response first so we can
// add all available tags to the metrics later.
var valueFields []updateField
for _, update := range response.Update.Update {
fullPath := prefix.append(update.Path)
if update.Path.Origin != "" {
fullPath.origin = update.Path.Origin
}
fields, err := h.newFieldsFromUpdate(fullPath, update)
if err != nil {
h.log.Errorf("Processing update %v failed: %v", update, err)
}
// Prepare tags from prefix
tags := make(map[string]string, len(headerTags))
for key, val := range headerTags {
tags[key] = val
}
for key, val := range fullPath.tags(h.tagPathPrefix) {
tags[key] = val
}
// TODO: Handle each field individually to allow in-JSON tags
var tagUpdate bool
for _, tagSub := range h.tagsubs {
if !fullPath.equalsPathNoKeys(tagSub.fullPath) {
continue
}
h.log.Debugf("Tag-subscription update for %q: %+v", tagSub.Name, update)
if err := h.tagStore.insert(tagSub, fullPath, fields, tags); err != nil {
h.log.Errorf("Inserting tag failed: %v", err)
}
tagUpdate = true
break
}
if !tagUpdate {
valueFields = append(valueFields, fields...)
}
}
// Some devices do not provide a prefix, so do some guesswork based
// on the paths of the fields
if headerTags["path"] == "" && h.guessPathStrategy == "common path" {
if prefixPath := guessPrefixFromUpdate(valueFields); prefixPath != "" {
headerTags["path"] = prefixPath
}
}
// Parse individual update message and create measurements
for _, field := range valueFields {
if field.path.empty() {
continue
}
// Prepare tags from prefix
fieldTags := field.path.tags(h.tagPathPrefix)
tags := make(map[string]string, len(headerTags)+len(fieldTags))
for key, val := range headerTags {
tags[key] = val
}
for key, val := range fieldTags {
tags[key] = val
}
// Add the tags derived via tag-subscriptions
for k, v := range h.tagStore.lookup(field.path, tags) {
tags[k] = v
}
// Lookup alias for the metric
aliasPath, name := h.lookupAlias(field.path)
if name == "" {
h.log.Debugf("No measurement alias for gNMI path: %s", field.path)
if !h.emptyNameWarnShown {
if buf, err := json.Marshal(response); err == nil {
h.log.Warnf(emptyNameWarning, field.path, string(buf))
} else {
h.log.Warnf(emptyNameWarning, field.path, response.Update)
}
h.emptyNameWarnShown = true
}
}
aliasInfo := newInfoFromString(aliasPath)
if tags["path"] == "" && h.guessPathStrategy == "subscription" {
tags["path"] = aliasInfo.String()
}
// Group metrics
var key string
if h.canonicalFieldNames {
// Strip the origin is any for the field names
field.path.origin = ""
key = field.path.String()
key = strings.ReplaceAll(key, "-", "_")
} else {
// If the alias is a subpath of the field path and the alias is
// shorter than the full path to avoid an empty key, then strip the
// common part of the field is prefixed with the alias path. Note
// the origins can match or be empty and be considered equal.
if relative := aliasInfo.relative(field.path, true); relative != "" {
key = relative
} else {
// Otherwise use the last path element as the field key
key = field.path.base()
}
key = strings.ReplaceAll(key, "-", "_")
}
if h.trimSlash {
key = strings.TrimLeft(key, "/.")
}
if key == "" {
h.log.Errorf("Invalid empty path %q with alias %q", field.path.String(), aliasPath)
continue
}
grouper.Add(name, tags, timestamp, key, field.value)
}
// Add grouped measurements
for _, metricToAdd := range grouper.Metrics() {
acc.AddMetric(metricToAdd)
}
}
// Try to find the alias for the given path
type aliasCandidate struct {
path, alias string
}
func (h *handler) lookupAlias(info *pathInfo) (aliasPath, alias string) {
candidates := make([]aliasCandidate, 0)
for i, a := range h.aliases {
if !i.isSubPathOf(info) {
continue
}
candidates = append(candidates, aliasCandidate{i.String(), a})
}
if len(candidates) == 0 {
return "", ""
}
// Reverse sort the candidates by path length so we can use the longest match
sort.SliceStable(candidates, func(i, j int) bool {
return len(candidates[i].path) > len(candidates[j].path)
})
return candidates[0].path, candidates[0].alias
}
func guessPrefixFromUpdate(fields []updateField) string {
if len(fields) == 0 {
return ""
}
if len(fields) == 1 {
return fields[0].path.dir()
}
segments := make([]segment, 0, len(fields[0].path.segments))
commonPath := &pathInfo{
origin: fields[0].path.origin,
segments: append(segments, fields[0].path.segments...),
}
for _, f := range fields[1:] {
commonPath.keepCommonPart(f.path)
}
if commonPath.empty() {
return ""
}
return commonPath.String()
}