Skip to content

Commit

Permalink
feat/groundwork-output: improve parsing, extend payload
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavlo Sumkin committed Jul 20, 2022
1 parent cbc75a2 commit a119a0a
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 139 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/gosnmp/gosnmp v1.34.0
github.com/grid-x/modbus v0.0.0-20211113184042-7f2251c342c9
github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c
github.com/gwos/tcg/sdk v0.0.0-20220621192633-df0eac0a1a4c
github.com/harlow/kinesis-consumer v0.3.6-0.20210911031324-5a873d6e9fec
github.com/hashicorp/consul/api v1.12.0
github.com/hashicorp/go-uuid v1.0.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1256,8 +1256,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.14.5/go.mod h1:UJ0EZAp832vCd54Wev9N1BM
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c h1:befb5xGUwNCoBuN/akLFCKekUzr0ixyws3aAX/7TaOk=
github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c/go.mod h1:OjlJNRXwlEjznVfU3YtLWH8FyM7KWHUevXDI47UeZeM=
github.com/gwos/tcg/sdk v0.0.0-20220621192633-df0eac0a1a4c h1:pVr0TkSFnMP4BWSsEak/4bxD8/K+foJ9V8DGyZ6PIDE=
github.com/gwos/tcg/sdk v0.0.0-20220621192633-df0eac0a1a4c/go.mod h1:4yzxLBACr76Is0AMAkE0F/fqWBk28p2tzeO06yDGR/Y=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/harlow/kinesis-consumer v0.3.6-0.20210911031324-5a873d6e9fec h1:ya+kv1eNnd5QhcHuaj5g5eMq5Ra3VCNaPY2ZI7Aq91o=
Expand Down
29 changes: 18 additions & 11 deletions plugins/outputs/groundwork/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,26 @@ GW8+

## List of tags used by the plugin

* group - to define the name of the group you want to monitor, can be changed
with config.
* host - to define the name of the host you want to monitor, can be changed with
config.
* service - to define the name of the service you want to monitor.
* status - to define the status of the service. Supported statuses:
* __group__ - to define the name of the group you want to monitor,
can be changed with config.
* __host__ - to define the name of the host you want to monitor,
can be changed with config.
* __service__ - to define the name of the service you want to monitor.
* __status__ - to define the status of the service. Supported statuses:
"SERVICE_OK", "SERVICE_WARNING", "SERVICE_UNSCHEDULED_CRITICAL",
"SERVICE_PENDING", "SERVICE_SCHEDULED_CRITICAL", "SERVICE_UNKNOWN".
* message - to provide any message you want.
* unitType - to use in monitoring contexts(subset of The Unified Code for Units
of Measure standard). Supported types: "1", "%cpu", "KB", "GB", "MB".
* warning - to define warning threshold value.
* critical - to define critical threshold value.
* __message__ - to provide any message you want,
it overrides __message__ field value.
* __unitType__ - to use in monitoring contexts (subset of The Unified Code for
Units of Measure standard). Supported types: "1", "%cpu", "KB", "GB", "MB".
* __critical__ - to define the default critical threshold value,
it overrides value_cr field value.
* __warning__ - to define the default warning threshold value,
it overrides value_wn field value.
* __value_cr__ - to define critical threshold value,
it overrides __critical__ tag value and __value_cr__ field value.
* __value_wn__ - to define warning threshold value,
it overrides __warning__ tag value and __value_wn__ field value.

## NOTE

Expand Down
218 changes: 133 additions & 85 deletions plugins/outputs/groundwork/groundwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/gwos/tcg/sdk/clients"
"github.com/gwos/tcg/sdk/logper"
Expand Down Expand Up @@ -48,28 +49,28 @@ func (*Groundwork) SampleConfig() string {

func (g *Groundwork) Init() error {
if g.Server == "" {
return errors.New("no 'url' provided")
return errors.New(`no "url" provided`)
}
if g.AgentID == "" {
return errors.New("no 'agent_id' provided")
return errors.New(`no "agent_id" provided`)
}
if g.Username == "" {
return errors.New("no 'username' provided")
return errors.New(`no "username" provided`)
}
if g.Password == "" {
return errors.New("no 'password' provided")
return errors.New(`no "password" provided`)
}
if g.DefaultAppType == "" {
return errors.New("no 'default_app_type' provided")
return errors.New(`no "default_app_type" provided`)
}
if g.DefaultHost == "" {
return errors.New("no 'default_host' provided")
return errors.New(`no "default_host" provided`)
}
if g.ResourceTag == "" {
return errors.New("no 'resource_tag' provided")
return errors.New(`no "resource_tag" provided`)
}
if !validStatus(g.DefaultServiceState) {
return errors.New("invalid 'default_service_state' provided")
return errors.New(`invalid "default_service_state" provided`)
}

g.client = clients.GWClient{
Expand Down Expand Up @@ -214,120 +215,167 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (metricMeta, *transit.M
group, _ := metric.GetTag(g.GroupTag)

resource := g.DefaultHost
if value, present := metric.GetTag(g.ResourceTag); present {
resource = value
if v, ok := metric.GetTag(g.ResourceTag); ok {
resource = v
}

service := metric.Name()
if value, present := metric.GetTag("service"); present {
service = value
}

status := g.DefaultServiceState
value, statusPresent := metric.GetTag("status")
if validStatus(value) {
status = value
}

message := ""
if m, ok := metric.GetTag("message"); ok {
message = m
}
if m, ok := metric.GetField("message"); ok {
message = m.(string)
if v, ok := metric.GetTag("service"); ok {
service = v
}

unitType := string(transit.UnitCounter)
if value, present := metric.GetTag("unitType"); present {
unitType = value
}

var critical float64
value, criticalPresent := metric.GetTag("critical")
if criticalPresent {
if s, err := strconv.ParseFloat(value, 64); err == nil {
critical = s
}
}

var warning float64
value, warningPresent := metric.GetTag("warning")
if warningPresent {
if s, err := strconv.ParseFloat(value, 64); err == nil {
warning = s
}
if v, ok := metric.GetTag("unitType"); ok {
unitType = v
}

lastCheckTime := transit.NewTimestamp()
lastCheckTime.Time = metric.Time()
serviceObject := transit.MonitoredService{
BaseInfo: transit.BaseInfo{
Name: service,
Type: transit.ResourceTypeService,
Owner: resource,
Name: service,
Type: transit.ResourceTypeService,
Owner: resource,
Properties: make(map[string]transit.TypedValue),
},
MonitoredInfo: transit.MonitoredInfo{
Status: transit.MonitorStatus(status),
LastCheckTime: lastCheckTime,
NextCheckTime: lastCheckTime, // if not added, GW will make this as LastCheckTime + 5 mins
LastPluginOutput: message,
Status: transit.MonitorStatus(g.DefaultServiceState),
LastCheckTime: lastCheckTime,
NextCheckTime: lastCheckTime, // if not added, GW will make this as LastCheckTime + 5 mins
},
Metrics: nil,
}

for _, value := range metric.FieldList() {
var thresholds []transit.ThresholdValue
if warningPresent {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Warning,
Label: value.Key + "_wn",
Value: &transit.TypedValue{
ValueType: transit.DoubleType,
DoubleValue: &warning,
},
})
knownKey := func(t string) bool {
if strings.HasSuffix(t, "_cr") ||
strings.HasSuffix(t, "_wn") ||
t == "critical" ||
t == "warning" ||
t == g.GroupTag ||
t == g.ResourceTag ||
t == "service" ||
t == "status" ||
t == "message" ||
t == "unitType" {
return true
}
if criticalPresent {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Critical,
Label: value.Key + "_cr",
Value: &transit.TypedValue{
ValueType: transit.DoubleType,
DoubleValue: &critical,
},
})
return false
}

for _, tag := range metric.TagList() {
if knownKey(tag.Key) {
continue
}
serviceObject.Properties[tag.Key] = *transit.NewTypedValue(tag.Value)
}

typedValue := transit.NewTypedValue(value.Value)
if typedValue == nil {
g.Log.Warnf("could not convert type %T, skipping field %s: %v", value.Value, value.Key, value.Value)
for _, field := range metric.FieldList() {
if knownKey(field.Key) {
continue
}
if typedValue.ValueType == transit.StringType {
g.Log.Warnf("string values are not supported, skipping field %s: %q", value.Key, value.Value)

switch field.Value.(type) {
case string, []byte:
g.Log.Warnf("string values are not supported, skipping field %s: %q", field.Key, field.Value)
continue
}

typedValue := transit.NewTypedValue(field.Value)
if typedValue == nil {
g.Log.Warnf("could not convert type %T, skipping field %s: %v", field.Value, field.Key, field.Value)
continue
}

var thresholds []transit.ThresholdValue
addCriticalThreshold := func(v interface{}) {
if tv := transit.NewTypedValue(v); tv != nil {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Critical,
Label: field.Key + "_cr",
Value: tv,
})
}
}
addWarningThreshold := func(v interface{}) {
if tv := transit.NewTypedValue(v); tv != nil {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Warning,
Label: field.Key + "_wn",
Value: tv,
})
}
}
if v, ok := metric.GetTag(field.Key + "_cr"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addCriticalThreshold(v)
}
} else if v, ok := metric.GetTag("critical"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addCriticalThreshold(v)
}
} else if v, ok := metric.GetField(field.Key + "_cr"); ok {
addCriticalThreshold(v)
}
if v, ok := metric.GetTag(field.Key + "_wn"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addWarningThreshold(v)
}
} else if v, ok := metric.GetTag("warning"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addWarningThreshold(v)
}
} else if v, ok := metric.GetField(field.Key + "_wn"); ok {
addWarningThreshold(v)
}

serviceObject.Metrics = append(serviceObject.Metrics, transit.TimeSeries{
MetricName: value.Key,
MetricName: field.Key,
SampleType: transit.Value,
Interval: &transit.TimeInterval{
EndTime: lastCheckTime,
},
Interval: &transit.TimeInterval{EndTime: lastCheckTime},
Value: typedValue,
Unit: transit.UnitType(unitType),
Thresholds: thresholds,
})
}

if !statusPresent {
serviceStatus, err := transit.CalculateServiceStatus(&serviceObject.Metrics)
if m, ok := metric.GetTag("message"); ok {
serviceObject.LastPluginOutput = m
} else if m, ok := metric.GetField("message"); ok {
switch m := m.(type) {
case string:
serviceObject.LastPluginOutput = m
case []byte:
serviceObject.LastPluginOutput = string(m)
default:
serviceObject.LastPluginOutput = fmt.Sprintf("%v", m)
}
}

func() {
if s, ok := metric.GetTag("status"); ok && validStatus(s) {
serviceObject.Status = transit.MonitorStatus(s)
return
}
if s, ok := metric.GetField("status"); ok {
status := g.DefaultServiceState
switch s := s.(type) {
case string:
status = s
case []byte:
status = string(s)
}
if validStatus(status) {
serviceObject.Status = transit.MonitorStatus(status)
return
}
}
status, err := transit.CalculateServiceStatus(&serviceObject.Metrics)
if err != nil {
g.Log.Infof("could not calculate service status, reverting to default_service_state: %v", err)
serviceObject.Status = transit.MonitorStatus(g.DefaultServiceState)
status = transit.MonitorStatus(g.DefaultServiceState)
}
serviceObject.Status = serviceStatus
}
serviceObject.Status = status
}()

return metricMeta{resource: resource, group: group}, &serviceObject, nil
}
Expand Down
Loading

0 comments on commit a119a0a

Please sign in to comment.