-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathsink_metric_timer.go
76 lines (63 loc) · 2.31 KB
/
sink_metric_timer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package audit
import (
"context"
"fmt"
"reflect"
"strings"
"github.com/armon/go-metrics"
"github.com/hashicorp/eventlogger"
)
var _ eventlogger.Node = (*sinkMetricTimer)(nil)
// sinkMetricTimer is a wrapper for any kind of eventlogger.NodeTypeSink node that
// processes events containing an AuditEvent payload.
// It decorates the implemented eventlogger.Node Process method in order to emit
// timing metrics for the duration between the creation time of the event and the
// time the node completes processing.
type sinkMetricTimer struct {
name string
sink eventlogger.Node
}
// newSinkMetricTimer should be used to create the sinkMetricTimer.
// It expects that an eventlogger.NodeTypeSink should be supplied as the sink.
func newSinkMetricTimer(name string, sink eventlogger.Node) (*sinkMetricTimer, error) {
name = strings.TrimSpace(name)
if name == "" {
return nil, fmt.Errorf("name is required: %w", ErrInvalidParameter)
}
if sink == nil || reflect.ValueOf(sink).IsNil() {
return nil, fmt.Errorf("sink node is required: %w", ErrInvalidParameter)
}
if sink.Type() != eventlogger.NodeTypeSink {
return nil, fmt.Errorf("sink node must be of type 'sink': %w", ErrInvalidParameter)
}
return &sinkMetricTimer{
name: name,
sink: sink,
}, nil
}
// Process wraps the Process method of underlying sink (eventlogger.Node).
// Additionally, when the supplied eventlogger.Event has an AuditEvent as its payload,
// it measures the elapsed time between the creation of the eventlogger.Event and
// the completion of processing, emitting this as a metric.
// Examples:
// 'vault.audit.{DEVICE}.log_request'
// 'vault.audit.{DEVICE}.log_response'
func (s *sinkMetricTimer) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
defer func() {
auditEvent, ok := e.Payload.(*Event)
if ok {
metrics.MeasureSince([]string{"audit", s.name, auditEvent.Subtype.MetricTag()}, e.CreatedAt)
}
}()
return s.sink.Process(ctx, e)
}
// Reopen wraps the Reopen method of this underlying sink (eventlogger.Node).
func (s *sinkMetricTimer) Reopen() error {
return s.sink.Reopen()
}
// Type wraps the Type method of this underlying sink (eventlogger.Node).
func (s *sinkMetricTimer) Type() eventlogger.NodeType {
return s.sink.Type()
}