-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlotus.go
145 lines (126 loc) · 4.57 KB
/
lotus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package lotus
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
const pluginName string = "telegraf-input-lotus"
const lotusMeasurement string = "lotus"
const lotusSealingWorkers string = "lotus_sealing_workers"
const lotusSealingJobs string = "lotus_sealing_jobs"
const lotusStorageStats string = "lotus_storage_stats"
type LotusInput struct {
DaemonAddr string `toml:"daemonAddr"`
DaemonToken string `toml:"daemonToken"`
ApiVersion string `toml:"daemonApiVersion"`
MinerAddr string `toml:"minerAddr"`
MinerToken string `toml:"minerToken"`
Log telegraf.Logger `toml:"-"`
Daemon *Daemon `toml:"-"`
Miner *Miner `toml:"-"`
}
func (s *LotusInput) Description() string {
return "Stream lotus-daemon and lotus-miner metrics"
}
func (s *LotusInput) SampleConfig() string {
return `
## Lotus daemon listen address
daemonAddr = 127.0.0.1:1234
## Lotus daemon API token (example)
daemonToken = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJBbGxvdyI6WyJyZWFkIl19.aneYo3I_Ts45E36uBcLNNK61q2aKj3p462fByqnam1s
## Lotus daemon API Version (default: v0)
daemonApiVersion = "v1"
## Lotus miner listen address
minerAddr = 127.0.0.1:1234
## Lotus miner API token (example)
minerToken = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJBbGxvdyI6WyJyZWFkIl19.aneYo3I_Ts45E36uBcLNNK61q2aKj3p462fByqnam1s
`
}
func (s *LotusInput) Init() error {
if s.DaemonAddr != "" {
daemon, err := NewDaemon(s.DaemonAddr, s.DaemonToken, s.ApiVersion)
if err != nil {
return err
}
s.Daemon = daemon
}
if s.MinerAddr != "" {
miner, err := NewMiner(s.MinerAddr, s.MinerToken)
if err != nil {
return err
}
s.Miner = miner
}
return nil
}
func (s *LotusInput) Gather(acc telegraf.Accumulator) error {
var daemonMetrics DaemonMetrics
if s.Daemon != nil {
daemonMetrics = s.Daemon.FetchMetrics()
}
var minerMetrics MinerMetrics
if s.Miner != nil {
minerMetrics = s.Miner.FetchMetrics()
}
measurements := map[string]interface{}{
"epoch": daemonMetrics.Status.SyncStatus.Epoch,
"behind": daemonMetrics.Status.SyncStatus.Behind,
"messagePeers": daemonMetrics.Status.PeerStatus.PeersToPublishMsgs,
"blockPeers": daemonMetrics.Status.PeerStatus.PeersToPublishBlocks,
"balance": daemonMetrics.Balance}
sectorsTotal := 0
for sectorState, count := range minerMetrics.SectorSummary {
measurements[fmt.Sprintf("sectors%s", sectorState)] = count
sectorsTotal += count
}
measurements["sectorsTotal"] = sectorsTotal
// TODO: Extract argument to struct
// add tags worker_host:key.value.info.Hostname
// add tags worker_id:key.String()
acc.AddFields(lotusMeasurement, measurements, nil)
workerIDtoNameMap := map[string]string{}
for key, value := range minerMetrics.WorkerStats {
workerMeasurements := map[string]interface{}{}
tags := map[string]string{}
tags["worker_host"] = value.Info.Hostname
tags["worker_id"] = key.String()
workerIDtoNameMap[key.String()] = value.Info.Hostname
workerMeasurements["cpu_use"] = value.CpuUse
workerMeasurements["mem_physical"] = value.Info.Resources.MemPhysical
workerMeasurements["mem_used"] = value.Info.Resources.MemUsed
workerMeasurements["mem_swap_used"] = value.Info.Resources.MemSwapUsed
workerMeasurements["gpu_used"] = value.GpuUsed
acc.AddFields(lotusSealingWorkers, workerMeasurements, tags)
}
for key, value := range minerMetrics.WorkerJobs {
for _, job := range value {
jobMeasurements := map[string]interface{}{}
tags := map[string]string{}
tags["worker_host"] = workerIDtoNameMap[key.String()]
tags["worker_id"] = key.String()
tags["job_id"] = job.ID.ID.String()
tags["sector"] = job.Sector.Number.String()
tags["miner_id"] = job.Sector.Miner.String()
jobMeasurements["run_wait"] = job.RunWait
jobMeasurements["start"] = job.Start.String()
jobMeasurements["task"] = job.Task.Short()
acc.AddFields(lotusSealingJobs, jobMeasurements, tags)
}
}
for key, stat := range minerMetrics.StorageStats {
storageMeasurments := map[string]interface{}{}
tags := map[string]string{}
tags["storage_id"] = string(key)
storageMeasurments["available"] = stat.Available
storageMeasurments["capacity"] = stat.Capacity
storageMeasurments["fs_available"] = stat.FSAvailable
storageMeasurments["max"] = stat.Max
storageMeasurments["reserved"] = stat.Reserved
storageMeasurments["used"] = stat.Used
acc.AddFields(lotusStorageStats, storageMeasurments, map[string]string{})
}
return nil
}
func init() {
inputs.Add(pluginName, func() telegraf.Input { return &LotusInput{} })
}