-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathrecorder.go
57 lines (49 loc) · 1.38 KB
/
recorder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package gather
import (
"context"
"encoding/json"
"github.com/influxdata/influxdb/nats"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/tsdb"
"go.uber.org/zap"
)
// PointWriter will use the storage.PointWriter interface to record metrics.
type PointWriter struct {
Writer storage.PointsWriter
}
// Record the metrics and write using storage.PointWriter interface.
func (s PointWriter) Record(collected MetricsCollection) error {
ps, err := collected.MetricsSlice.Points()
if err != nil {
return err
}
ps, err = tsdb.ExplodePoints(collected.OrgID, collected.BucketID, ps)
if err != nil {
return err
}
return s.Writer.WritePoints(context.TODO(), ps)
}
// Recorder record the metrics of a time based.
type Recorder interface {
//Subscriber nats.Subscriber
Record(collected MetricsCollection) error
}
// RecorderHandler implements nats.Handler interface.
type RecorderHandler struct {
Recorder Recorder
Logger *zap.Logger
}
// Process consumes job queue, and use recorder to record.
func (h *RecorderHandler) Process(s nats.Subscription, m nats.Message) {
defer m.Ack()
collected := new(MetricsCollection)
err := json.Unmarshal(m.Data(), &collected)
if err != nil {
h.Logger.Error("recorder handler error", zap.Error(err))
return
}
err = h.Recorder.Record(*collected)
if err != nil {
h.Logger.Error("recorder handler error", zap.Error(err))
}
}