Skip to content

Commit

Permalink
fix: Linter fixes for plugins/inputs/[k-l]* (#9999)
Browse files Browse the repository at this point in the history
  • Loading branch information
zak-pawel committed Oct 27, 2021
1 parent 488568c commit eec6fd5
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 86 deletions.
11 changes: 7 additions & 4 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package kafka_consumer
import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"

"github.com/Shopify/sarama"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/kafka"
Expand Down Expand Up @@ -232,7 +232,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
go func() {
defer k.wg.Done()
for ctx.Err() == nil {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser)
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
err := k.consumer.Consume(ctx, k.Topics, handler)
Expand Down Expand Up @@ -276,12 +276,13 @@ type Message struct {
session sarama.ConsumerGroupSession
}

func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser parsers.Parser) *ConsumerGroupHandler {
func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser parsers.Parser, log telegraf.Logger) *ConsumerGroupHandler {
handler := &ConsumerGroupHandler{
acc: acc.WithTracking(maxUndelivered),
sem: make(chan empty, maxUndelivered),
undelivered: make(map[telegraf.TrackingID]Message, maxUndelivered),
parser: parser,
log: log,
}
return handler
}
Expand All @@ -299,6 +300,8 @@ type ConsumerGroupHandler struct {

mu sync.Mutex
undelivered map[telegraf.TrackingID]Message

log telegraf.Logger
}

// Setup is called once when a new session is opened. It setups up the handler
Expand Down Expand Up @@ -335,7 +338,7 @@ func (h *ConsumerGroupHandler) onDelivery(track telegraf.DeliveryInfo) {

msg, ok := h.undelivered[track.ID()]
if !ok {
log.Printf("E! [inputs.kafka_consumer] Could not mark message delivered: %d", track.ID())
h.log.Errorf("Could not mark message delivered: %d", track.ID())
return
}

Expand Down
12 changes: 7 additions & 5 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

type FakeConsumerGroup struct {
Expand Down Expand Up @@ -259,7 +260,7 @@ func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.NewValueParser("cpu", "int", "", nil)
cg := NewConsumerGroupHandler(acc, 1, parser)
cg := NewConsumerGroupHandler(acc, 1, parser, testutil.Logger{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -274,11 +275,12 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
require.NoError(t, err)

cancel()
// This produces a flappy testcase probably due to a race between context cancelation and consumption.
// This produces a flappy testcase probably due to a race between context cancellation and consumption.
// Furthermore, it is not clear what the outcome of this test should be...
// err = cg.ConsumeClaim(session, &claim)
//require.NoError(t, err)
// So stick with the line below for now.
//nolint:errcheck
cg.ConsumeClaim(session, &claim)

err = cg.Cleanup(session)
Expand All @@ -288,7 +290,7 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.NewValueParser("cpu", "int", "", nil)
cg := NewConsumerGroupHandler(acc, 1, parser)
cg := NewConsumerGroupHandler(acc, 1, parser, testutil.Logger{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -402,7 +404,7 @@ func TestConsumerGroupHandler_Handle(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
acc := &testutil.Accumulator{}
parser := value.NewValueParser("cpu", "int", "", nil)
cg := NewConsumerGroupHandler(acc, 1, parser)
cg := NewConsumerGroupHandler(acc, 1, parser, testutil.Logger{})
cg.MaxMessageLen = tt.maxMessageLen
cg.TopicTag = tt.topicTag

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
)

func TestReadsMetricsFromKafka(t *testing.T) {
Expand Down Expand Up @@ -51,7 +50,7 @@ func TestReadsMetricsFromKafka(t *testing.T) {
var acc testutil.Accumulator

// Sanity check
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
require.Equal(t, 0, len(acc.Metrics), "There should not be any points")
if err := k.Start(&acc); err != nil {
t.Fatal(err.Error())
} else {
Expand All @@ -65,14 +64,14 @@ func TestReadsMetricsFromKafka(t *testing.T) {
require.NoError(t, err)
if len(acc.Metrics) == 1 {
point := acc.Metrics[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
assert.Equal(t, map[string]string{
require.Equal(t, "cpu_load_short", point.Measurement)
require.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
require.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
require.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
} else {
t.Errorf("No points found in accumulator, expected 1")
}
Expand All @@ -84,6 +83,7 @@ func waitForPoint(acc *testutil.Accumulator, t *testing.T) {
// Give the kafka container up to 2 seconds to get the point to the consumer
ticker := time.NewTicker(5 * time.Millisecond)
counter := 0
//nolint:gosimple // for-select used on purpose
for {
select {
case <-ticker.C:
Expand Down
14 changes: 8 additions & 6 deletions plugins/inputs/kinesis_consumer/kinesis_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package kinesis_consumer

import (
"encoding/base64"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
consumer "github.com/harlow/kinesis-consumer"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"testing"
)

func TestKinesisConsumer_onMessage(t *testing.T) {
Expand Down Expand Up @@ -177,7 +179,7 @@ func TestKinesisConsumer_onMessage(t *testing.T) {
ContentEncoding: "notsupported",
}
err := k.Init()
assert.NotNil(t, err)
require.NotNil(t, err)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -187,18 +189,18 @@ func TestKinesisConsumer_onMessage(t *testing.T) {
records: tt.fields.records,
}
err := k.Init()
assert.Nil(t, err)
require.Nil(t, err)

acc := testutil.Accumulator{}
if err := k.onMessage(acc.WithTracking(tt.expected.numberOfMetrics), tt.args.r); (err != nil) != tt.wantErr {
t.Errorf("onMessage() error = %v, wantErr %v", err, tt.wantErr)
}

assert.Equal(t, tt.expected.numberOfMetrics, len(acc.Metrics))
require.Equal(t, tt.expected.numberOfMetrics, len(acc.Metrics))

for _, metric := range acc.Metrics {
if logEventMessage, ok := metric.Fields["message"]; ok {
assert.Contains(t, logEventMessage.(string), tt.expected.messageContains)
require.Contains(t, logEventMessage.(string), tt.expected.messageContains)
} else {
t.Errorf("Expect logEvents to be present")
}
Expand Down
36 changes: 17 additions & 19 deletions plugins/inputs/knx_listener/knx_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/vapourismo/knx-go/knx"
"github.com/vapourismo/knx-go/knx/cemi"
"github.com/vapourismo/knx-go/knx/dpt"

"github.com/influxdata/telegraf/testutil"
)

const epsilon = 1e-3
Expand Down Expand Up @@ -127,17 +125,17 @@ func TestRegularReceives_DPT(t *testing.T) {
// Check if we got what we expected
require.Len(t, acc.Metrics, len(testcases))
for i, m := range acc.Metrics {
assert.Equal(t, "test", m.Measurement)
assert.Equal(t, testcases[i].address, m.Tags["groupaddress"])
assert.Len(t, m.Fields, 1)
require.Equal(t, "test", m.Measurement)
require.Equal(t, testcases[i].address, m.Tags["groupaddress"])
require.Len(t, m.Fields, 1)
switch v := testcases[i].value.(type) {
case bool, int64, uint64:
assert.Equal(t, v, m.Fields["value"])
require.Equal(t, v, m.Fields["value"])
case float64:
assert.InDelta(t, v, m.Fields["value"], epsilon)
require.InDelta(t, v, m.Fields["value"], epsilon)
}
assert.True(t, !tstop.Before(m.Time))
assert.True(t, !tstart.After(m.Time))
require.True(t, !tstop.Before(m.Time))
require.True(t, !tstart.After(m.Time))
}
}

Expand Down Expand Up @@ -178,13 +176,13 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
// Check if we got what we expected
require.Len(t, acc.Metrics, 2)

assert.Equal(t, "temperature", acc.Metrics[0].Measurement)
assert.Equal(t, "1/1/1", acc.Metrics[0].Tags["groupaddress"])
assert.Len(t, acc.Metrics[0].Fields, 1)
assert.Equal(t, true, acc.Metrics[0].Fields["value"])
require.Equal(t, "temperature", acc.Metrics[0].Measurement)
require.Equal(t, "1/1/1", acc.Metrics[0].Tags["groupaddress"])
require.Len(t, acc.Metrics[0].Fields, 1)
require.Equal(t, true, acc.Metrics[0].Fields["value"])

assert.Equal(t, "temperature", acc.Metrics[1].Measurement)
assert.Equal(t, "1/1/1", acc.Metrics[1].Tags["groupaddress"])
assert.Len(t, acc.Metrics[1].Fields, 1)
assert.Equal(t, false, acc.Metrics[1].Fields["value"])
require.Equal(t, "temperature", acc.Metrics[1].Measurement)
require.Equal(t, "1/1/1", acc.Metrics[1].Tags["groupaddress"])
require.Len(t, acc.Metrics[1].Fields, 1)
require.Equal(t, false, acc.Metrics[1].Fields["value"])
}
13 changes: 7 additions & 6 deletions plugins/inputs/kube_inventory/kube_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kube_inventory
import (
"context"
"fmt"
"log"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -37,6 +36,8 @@ type KubernetesInventory struct {
SelectorInclude []string `toml:"selector_include"`
SelectorExclude []string `toml:"selector_exclude"`

Log telegraf.Logger `toml:"-"`

tls.ClientConfig
client *client

Expand Down Expand Up @@ -169,15 +170,15 @@ func atoi(s string) int64 {
return i
}

func convertQuantity(s string, m float64) int64 {
func (ki *KubernetesInventory) convertQuantity(s string, m float64) int64 {
q, err := resource.ParseQuantity(s)
if err != nil {
log.Printf("D! [inputs.kube_inventory] failed to parse quantity: %s", err.Error())
ki.Log.Debugf("failed to parse quantity: %s", err.Error())
return 0
}
f, err := strconv.ParseFloat(fmt.Sprint(q.AsDec()), 64)
if err != nil {
log.Printf("D! [inputs.kube_inventory] failed to parse float: %s", err.Error())
ki.Log.Debugf("failed to parse float: %s", err.Error())
return 0
}
if m < 1 {
Expand All @@ -187,11 +188,11 @@ func convertQuantity(s string, m float64) int64 {
}

func (ki *KubernetesInventory) createSelectorFilters() error {
filter, err := filter.NewIncludeExcludeFilter(ki.SelectorInclude, ki.SelectorExclude)
selectorFilter, err := filter.NewIncludeExcludeFilter(ki.SelectorInclude, ki.SelectorExclude)
if err != nil {
return err
}
ki.selectorFilter = filter
ki.selectorFilter = selectorFilter
return nil
}

Expand Down
13 changes: 6 additions & 7 deletions plugins/inputs/kube_inventory/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ func (ki *KubernetesInventory) gatherNode(n corev1.Node, acc telegraf.Accumulato
}

for resourceName, val := range n.Status.Capacity {

switch resourceName {
case "cpu":
fields["capacity_cpu_cores"] = convertQuantity(val.String(), 1)
fields["capacity_millicpu_cores"] = convertQuantity(val.String(), 1000)
fields["capacity_cpu_cores"] = ki.convertQuantity(val.String(), 1)
fields["capacity_millicpu_cores"] = ki.convertQuantity(val.String(), 1000)
case "memory":
fields["capacity_memory_bytes"] = convertQuantity(val.String(), 1)
fields["capacity_memory_bytes"] = ki.convertQuantity(val.String(), 1)
case "pods":
fields["capacity_pods"] = atoi(val.String())
}
Expand All @@ -41,10 +40,10 @@ func (ki *KubernetesInventory) gatherNode(n corev1.Node, acc telegraf.Accumulato
for resourceName, val := range n.Status.Allocatable {
switch resourceName {
case "cpu":
fields["allocatable_cpu_cores"] = convertQuantity(val.String(), 1)
fields["allocatable_millicpu_cores"] = convertQuantity(val.String(), 1000)
fields["allocatable_cpu_cores"] = ki.convertQuantity(val.String(), 1)
fields["allocatable_millicpu_cores"] = ki.convertQuantity(val.String(), 1000)
case "memory":
fields["allocatable_memory_bytes"] = convertQuantity(val.String(), 1)
fields["allocatable_memory_bytes"] = ki.convertQuantity(val.String(), 1)
case "pods":
fields["allocatable_pods"] = atoi(val.String())
}
Expand Down
12 changes: 6 additions & 6 deletions plugins/inputs/kube_inventory/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func (ki *KubernetesInventory) gatherPod(p corev1.Pod, acc telegraf.Accumulator)
if !ok {
cs = &corev1.ContainerStatus{}
}
gatherPodContainer(ki, p, *cs, c, acc)
ki.gatherPodContainer(p, *cs, c, acc)
}
}

func gatherPodContainer(ki *KubernetesInventory, p corev1.Pod, cs corev1.ContainerStatus, c corev1.Container, acc telegraf.Accumulator) {
func (ki *KubernetesInventory) gatherPodContainer(p corev1.Pod, cs corev1.ContainerStatus, c corev1.Container, acc telegraf.Accumulator) {
stateCode := 3
stateReason := ""
state := "unknown"
Expand Down Expand Up @@ -103,17 +103,17 @@ func gatherPodContainer(ki *KubernetesInventory, p corev1.Pod, cs corev1.Contain
for resourceName, val := range req {
switch resourceName {
case "cpu":
fields["resource_requests_millicpu_units"] = convertQuantity(val.String(), 1000)
fields["resource_requests_millicpu_units"] = ki.convertQuantity(val.String(), 1000)
case "memory":
fields["resource_requests_memory_bytes"] = convertQuantity(val.String(), 1)
fields["resource_requests_memory_bytes"] = ki.convertQuantity(val.String(), 1)
}
}
for resourceName, val := range lim {
switch resourceName {
case "cpu":
fields["resource_limits_millicpu_units"] = convertQuantity(val.String(), 1000)
fields["resource_limits_millicpu_units"] = ki.convertQuantity(val.String(), 1000)
case "memory":
fields["resource_limits_memory_bytes"] = convertQuantity(val.String(), 1)
fields["resource_limits_memory_bytes"] = ki.convertQuantity(val.String(), 1)
}
}

Expand Down
Loading

0 comments on commit eec6fd5

Please sign in to comment.