From eec6fd5702b59423c4eeb85ea10517cacea0f745 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20=C5=BBak?= Date: Wed, 27 Oct 2021 17:48:57 +0200 Subject: [PATCH] fix: Linter fixes for plugins/inputs/[k-l]* (#9999) --- .../inputs/kafka_consumer/kafka_consumer.go | 11 +++--- .../kafka_consumer/kafka_consumer_test.go | 12 ++++--- .../kafka_consumer_legacy_integration_test.go | 14 ++++---- .../kinesis_consumer/kinesis_consumer_test.go | 14 ++++---- .../inputs/knx_listener/knx_listener_test.go | 36 +++++++++---------- plugins/inputs/kube_inventory/kube_state.go | 13 +++---- plugins/inputs/kube_inventory/node.go | 13 ++++--- plugins/inputs/kube_inventory/pod.go | 12 +++---- plugins/inputs/lanz/lanz.go | 2 ++ plugins/inputs/lanz/lanz_test.go | 3 +- plugins/inputs/logparser/logparser_test.go | 21 ++++++----- plugins/inputs/logstash/logstash.go | 22 ++++++------ plugins/inputs/lustre2/lustre2_test.go | 6 ++-- 13 files changed, 93 insertions(+), 86 deletions(-) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 70affdc2372b4..b20004a87a417 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -3,12 +3,12 @@ package kafka_consumer import ( "context" "fmt" - "log" "strings" "sync" "time" "github.com/Shopify/sarama" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/kafka" @@ -232,7 +232,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { go func() { defer k.wg.Done() for ctx.Err() == nil { - handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser) + handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) handler.MaxMessageLen = k.MaxMessageLen handler.TopicTag = k.TopicTag err := k.consumer.Consume(ctx, k.Topics, handler) @@ -276,12 +276,13 @@ type Message struct { session sarama.ConsumerGroupSession } -func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser parsers.Parser) *ConsumerGroupHandler { +func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser parsers.Parser, log telegraf.Logger) *ConsumerGroupHandler { handler := &ConsumerGroupHandler{ acc: acc.WithTracking(maxUndelivered), sem: make(chan empty, maxUndelivered), undelivered: make(map[telegraf.TrackingID]Message, maxUndelivered), parser: parser, + log: log, } return handler } @@ -299,6 +300,8 @@ type ConsumerGroupHandler struct { mu sync.Mutex undelivered map[telegraf.TrackingID]Message + + log telegraf.Logger } // Setup is called once when a new session is opened. It setups up the handler @@ -335,7 +338,7 @@ func (h *ConsumerGroupHandler) onDelivery(track telegraf.DeliveryInfo) { msg, ok := h.undelivered[track.ID()] if !ok { - log.Printf("E! [inputs.kafka_consumer] Could not mark message delivered: %d", track.ID()) + h.log.Errorf("Could not mark message delivered: %d", track.ID()) return } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index c73104278338e..68fd9e0627bed 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -6,12 +6,13 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) type FakeConsumerGroup struct { @@ -259,7 +260,7 @@ func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { func TestConsumerGroupHandler_Lifecycle(t *testing.T) { acc := &testutil.Accumulator{} parser := value.NewValueParser("cpu", "int", "", nil) - cg := NewConsumerGroupHandler(acc, 1, parser) + cg := NewConsumerGroupHandler(acc, 1, parser, testutil.Logger{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -274,11 +275,12 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) { require.NoError(t, err) cancel() - // This produces a flappy testcase probably due to a race between context cancelation and consumption. + // This produces a flappy testcase probably due to a race between context cancellation and consumption. // Furthermore, it is not clear what the outcome of this test should be... // err = cg.ConsumeClaim(session, &claim) //require.NoError(t, err) // So stick with the line below for now. + //nolint:errcheck cg.ConsumeClaim(session, &claim) err = cg.Cleanup(session) @@ -288,7 +290,7 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) { func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) { acc := &testutil.Accumulator{} parser := value.NewValueParser("cpu", "int", "", nil) - cg := NewConsumerGroupHandler(acc, 1, parser) + cg := NewConsumerGroupHandler(acc, 1, parser, testutil.Logger{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -402,7 +404,7 @@ func TestConsumerGroupHandler_Handle(t *testing.T) { t.Run(tt.name, func(t *testing.T) { acc := &testutil.Accumulator{} parser := value.NewValueParser("cpu", "int", "", nil) - cg := NewConsumerGroupHandler(acc, 1, parser) + cg := NewConsumerGroupHandler(acc, 1, parser, testutil.Logger{}) cg.MaxMessageLen = tt.maxMessageLen cg.TopicTag = tt.topicTag diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go index 976412a7196b5..473c5b9740847 100644 --- a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go +++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_integration_test.go @@ -6,11 +6,10 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" ) func TestReadsMetricsFromKafka(t *testing.T) { @@ -51,7 +50,7 @@ func TestReadsMetricsFromKafka(t *testing.T) { var acc testutil.Accumulator // Sanity check - assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") + require.Equal(t, 0, len(acc.Metrics), "There should not be any points") if err := k.Start(&acc); err != nil { t.Fatal(err.Error()) } else { @@ -65,14 +64,14 @@ func TestReadsMetricsFromKafka(t *testing.T) { require.NoError(t, err) if len(acc.Metrics) == 1 { point := acc.Metrics[0] - assert.Equal(t, "cpu_load_short", point.Measurement) - assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields) - assert.Equal(t, map[string]string{ + require.Equal(t, "cpu_load_short", point.Measurement) + require.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields) + require.Equal(t, map[string]string{ "host": "server01", "direction": "in", "region": "us-west", }, point.Tags) - assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix()) + require.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix()) } else { t.Errorf("No points found in accumulator, expected 1") } @@ -84,6 +83,7 @@ func waitForPoint(acc *testutil.Accumulator, t *testing.T) { // Give the kafka container up to 2 seconds to get the point to the consumer ticker := time.NewTicker(5 * time.Millisecond) counter := 0 + //nolint:gosimple // for-select used on purpose for { select { case <-ticker.C: diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go b/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go index 6d52f07835e6b..1e0d935e03cc6 100644 --- a/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer_test.go @@ -2,15 +2,17 @@ package kinesis_consumer import ( "encoding/base64" + "testing" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" consumer "github.com/harlow/kinesis-consumer" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - "testing" ) func TestKinesisConsumer_onMessage(t *testing.T) { @@ -177,7 +179,7 @@ func TestKinesisConsumer_onMessage(t *testing.T) { ContentEncoding: "notsupported", } err := k.Init() - assert.NotNil(t, err) + require.NotNil(t, err) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -187,18 +189,18 @@ func TestKinesisConsumer_onMessage(t *testing.T) { records: tt.fields.records, } err := k.Init() - assert.Nil(t, err) + require.Nil(t, err) acc := testutil.Accumulator{} if err := k.onMessage(acc.WithTracking(tt.expected.numberOfMetrics), tt.args.r); (err != nil) != tt.wantErr { t.Errorf("onMessage() error = %v, wantErr %v", err, tt.wantErr) } - assert.Equal(t, tt.expected.numberOfMetrics, len(acc.Metrics)) + require.Equal(t, tt.expected.numberOfMetrics, len(acc.Metrics)) for _, metric := range acc.Metrics { if logEventMessage, ok := metric.Fields["message"]; ok { - assert.Contains(t, logEventMessage.(string), tt.expected.messageContains) + require.Contains(t, logEventMessage.(string), tt.expected.messageContains) } else { t.Errorf("Expect logEvents to be present") } diff --git a/plugins/inputs/knx_listener/knx_listener_test.go b/plugins/inputs/knx_listener/knx_listener_test.go index b0502fbbc8e95..adb07eb6d0113 100644 --- a/plugins/inputs/knx_listener/knx_listener_test.go +++ b/plugins/inputs/knx_listener/knx_listener_test.go @@ -6,14 +6,12 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/testutil" - - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/vapourismo/knx-go/knx" "github.com/vapourismo/knx-go/knx/cemi" "github.com/vapourismo/knx-go/knx/dpt" + + "github.com/influxdata/telegraf/testutil" ) const epsilon = 1e-3 @@ -127,17 +125,17 @@ func TestRegularReceives_DPT(t *testing.T) { // Check if we got what we expected require.Len(t, acc.Metrics, len(testcases)) for i, m := range acc.Metrics { - assert.Equal(t, "test", m.Measurement) - assert.Equal(t, testcases[i].address, m.Tags["groupaddress"]) - assert.Len(t, m.Fields, 1) + require.Equal(t, "test", m.Measurement) + require.Equal(t, testcases[i].address, m.Tags["groupaddress"]) + require.Len(t, m.Fields, 1) switch v := testcases[i].value.(type) { case bool, int64, uint64: - assert.Equal(t, v, m.Fields["value"]) + require.Equal(t, v, m.Fields["value"]) case float64: - assert.InDelta(t, v, m.Fields["value"], epsilon) + require.InDelta(t, v, m.Fields["value"], epsilon) } - assert.True(t, !tstop.Before(m.Time)) - assert.True(t, !tstart.After(m.Time)) + require.True(t, !tstop.Before(m.Time)) + require.True(t, !tstart.After(m.Time)) } } @@ -178,13 +176,13 @@ func TestRegularReceives_MultipleMessages(t *testing.T) { // Check if we got what we expected require.Len(t, acc.Metrics, 2) - assert.Equal(t, "temperature", acc.Metrics[0].Measurement) - assert.Equal(t, "1/1/1", acc.Metrics[0].Tags["groupaddress"]) - assert.Len(t, acc.Metrics[0].Fields, 1) - assert.Equal(t, true, acc.Metrics[0].Fields["value"]) + require.Equal(t, "temperature", acc.Metrics[0].Measurement) + require.Equal(t, "1/1/1", acc.Metrics[0].Tags["groupaddress"]) + require.Len(t, acc.Metrics[0].Fields, 1) + require.Equal(t, true, acc.Metrics[0].Fields["value"]) - assert.Equal(t, "temperature", acc.Metrics[1].Measurement) - assert.Equal(t, "1/1/1", acc.Metrics[1].Tags["groupaddress"]) - assert.Len(t, acc.Metrics[1].Fields, 1) - assert.Equal(t, false, acc.Metrics[1].Fields["value"]) + require.Equal(t, "temperature", acc.Metrics[1].Measurement) + require.Equal(t, "1/1/1", acc.Metrics[1].Tags["groupaddress"]) + require.Len(t, acc.Metrics[1].Fields, 1) + require.Equal(t, false, acc.Metrics[1].Fields["value"]) } diff --git a/plugins/inputs/kube_inventory/kube_state.go b/plugins/inputs/kube_inventory/kube_state.go index 24db993dd39bb..94cb5faf9048b 100644 --- a/plugins/inputs/kube_inventory/kube_state.go +++ b/plugins/inputs/kube_inventory/kube_state.go @@ -3,7 +3,6 @@ package kube_inventory import ( "context" "fmt" - "log" "os" "strconv" "strings" @@ -37,6 +36,8 @@ type KubernetesInventory struct { SelectorInclude []string `toml:"selector_include"` SelectorExclude []string `toml:"selector_exclude"` + Log telegraf.Logger `toml:"-"` + tls.ClientConfig client *client @@ -169,15 +170,15 @@ func atoi(s string) int64 { return i } -func convertQuantity(s string, m float64) int64 { +func (ki *KubernetesInventory) convertQuantity(s string, m float64) int64 { q, err := resource.ParseQuantity(s) if err != nil { - log.Printf("D! [inputs.kube_inventory] failed to parse quantity: %s", err.Error()) + ki.Log.Debugf("failed to parse quantity: %s", err.Error()) return 0 } f, err := strconv.ParseFloat(fmt.Sprint(q.AsDec()), 64) if err != nil { - log.Printf("D! [inputs.kube_inventory] failed to parse float: %s", err.Error()) + ki.Log.Debugf("failed to parse float: %s", err.Error()) return 0 } if m < 1 { @@ -187,11 +188,11 @@ func convertQuantity(s string, m float64) int64 { } func (ki *KubernetesInventory) createSelectorFilters() error { - filter, err := filter.NewIncludeExcludeFilter(ki.SelectorInclude, ki.SelectorExclude) + selectorFilter, err := filter.NewIncludeExcludeFilter(ki.SelectorInclude, ki.SelectorExclude) if err != nil { return err } - ki.selectorFilter = filter + ki.selectorFilter = selectorFilter return nil } diff --git a/plugins/inputs/kube_inventory/node.go b/plugins/inputs/kube_inventory/node.go index 3c7c9cb38e160..b46b4e6209ffc 100644 --- a/plugins/inputs/kube_inventory/node.go +++ b/plugins/inputs/kube_inventory/node.go @@ -26,13 +26,12 @@ func (ki *KubernetesInventory) gatherNode(n corev1.Node, acc telegraf.Accumulato } for resourceName, val := range n.Status.Capacity { - switch resourceName { case "cpu": - fields["capacity_cpu_cores"] = convertQuantity(val.String(), 1) - fields["capacity_millicpu_cores"] = convertQuantity(val.String(), 1000) + fields["capacity_cpu_cores"] = ki.convertQuantity(val.String(), 1) + fields["capacity_millicpu_cores"] = ki.convertQuantity(val.String(), 1000) case "memory": - fields["capacity_memory_bytes"] = convertQuantity(val.String(), 1) + fields["capacity_memory_bytes"] = ki.convertQuantity(val.String(), 1) case "pods": fields["capacity_pods"] = atoi(val.String()) } @@ -41,10 +40,10 @@ func (ki *KubernetesInventory) gatherNode(n corev1.Node, acc telegraf.Accumulato for resourceName, val := range n.Status.Allocatable { switch resourceName { case "cpu": - fields["allocatable_cpu_cores"] = convertQuantity(val.String(), 1) - fields["allocatable_millicpu_cores"] = convertQuantity(val.String(), 1000) + fields["allocatable_cpu_cores"] = ki.convertQuantity(val.String(), 1) + fields["allocatable_millicpu_cores"] = ki.convertQuantity(val.String(), 1000) case "memory": - fields["allocatable_memory_bytes"] = convertQuantity(val.String(), 1) + fields["allocatable_memory_bytes"] = ki.convertQuantity(val.String(), 1) case "pods": fields["allocatable_pods"] = atoi(val.String()) } diff --git a/plugins/inputs/kube_inventory/pod.go b/plugins/inputs/kube_inventory/pod.go index ab4e5dd287cbe..ed95dd63d970d 100644 --- a/plugins/inputs/kube_inventory/pod.go +++ b/plugins/inputs/kube_inventory/pod.go @@ -35,11 +35,11 @@ func (ki *KubernetesInventory) gatherPod(p corev1.Pod, acc telegraf.Accumulator) if !ok { cs = &corev1.ContainerStatus{} } - gatherPodContainer(ki, p, *cs, c, acc) + ki.gatherPodContainer(p, *cs, c, acc) } } -func gatherPodContainer(ki *KubernetesInventory, p corev1.Pod, cs corev1.ContainerStatus, c corev1.Container, acc telegraf.Accumulator) { +func (ki *KubernetesInventory) gatherPodContainer(p corev1.Pod, cs corev1.ContainerStatus, c corev1.Container, acc telegraf.Accumulator) { stateCode := 3 stateReason := "" state := "unknown" @@ -103,17 +103,17 @@ func gatherPodContainer(ki *KubernetesInventory, p corev1.Pod, cs corev1.Contain for resourceName, val := range req { switch resourceName { case "cpu": - fields["resource_requests_millicpu_units"] = convertQuantity(val.String(), 1000) + fields["resource_requests_millicpu_units"] = ki.convertQuantity(val.String(), 1000) case "memory": - fields["resource_requests_memory_bytes"] = convertQuantity(val.String(), 1) + fields["resource_requests_memory_bytes"] = ki.convertQuantity(val.String(), 1) } } for resourceName, val := range lim { switch resourceName { case "cpu": - fields["resource_limits_millicpu_units"] = convertQuantity(val.String(), 1000) + fields["resource_limits_millicpu_units"] = ki.convertQuantity(val.String(), 1000) case "memory": - fields["resource_limits_memory_bytes"] = convertQuantity(val.String(), 1) + fields["resource_limits_memory_bytes"] = ki.convertQuantity(val.String(), 1) } } diff --git a/plugins/inputs/lanz/lanz.go b/plugins/inputs/lanz/lanz.go index 86bb93a8f754b..a77e99df61f6e 100644 --- a/plugins/inputs/lanz/lanz.go +++ b/plugins/inputs/lanz/lanz.go @@ -8,6 +8,7 @@ import ( "github.com/aristanetworks/goarista/lanz" pb "github.com/aristanetworks/goarista/lanz/proto" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -85,6 +86,7 @@ func (l *Lanz) Stop() { } func receive(acc telegraf.Accumulator, in <-chan *pb.LanzRecord, deviceURL *url.URL) { + //nolint:gosimple // for-select used on purpose for { select { case msg, ok := <-in: diff --git a/plugins/inputs/lanz/lanz_test.go b/plugins/inputs/lanz/lanz_test.go index 684bfc8902bb8..f2a8b5815e36d 100644 --- a/plugins/inputs/lanz/lanz_test.go +++ b/plugins/inputs/lanz/lanz_test.go @@ -6,7 +6,8 @@ import ( "testing" pb "github.com/aristanetworks/goarista/lanz/proto" - "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/proto" + "github.com/influxdata/telegraf/testutil" ) diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go index 3100c615cd4e4..a2f780afd21b9 100644 --- a/plugins/inputs/logparser/logparser_test.go +++ b/plugins/inputs/logparser/logparser_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -25,7 +24,7 @@ func TestStartNoParsers(t *testing.T) { } acc := testutil.Accumulator{} - assert.Error(t, logparser.Start(&acc)) + require.Error(t, logparser.Start(&acc)) } func TestGrokParseLogFilesNonExistPattern(t *testing.T) { @@ -41,7 +40,7 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) { acc := testutil.Accumulator{} err := logparser.Start(&acc) - assert.Error(t, err) + require.Error(t, err) } func TestGrokParseLogFiles(t *testing.T) { @@ -112,7 +111,7 @@ func TestGrokParseLogFiles(t *testing.T) { func TestGrokParseLogFilesAppearLater(t *testing.T) { emptydir, err := os.MkdirTemp("", "TestGrokParseLogFilesAppearLater") defer os.RemoveAll(emptydir) - assert.NoError(t, err) + require.NoError(t, err) logparser := &LogParserPlugin{ Log: testutil.Logger{}, @@ -126,17 +125,17 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, logparser.Start(&acc)) + require.NoError(t, logparser.Start(&acc)) - assert.Equal(t, acc.NFields(), 0) + require.Equal(t, acc.NFields(), 0) input, err := os.ReadFile(filepath.Join(testdataDir, "test_a.log")) - assert.NoError(t, err) + require.NoError(t, err) err = os.WriteFile(filepath.Join(emptydir, "test_a.log"), input, 0644) - assert.NoError(t, err) + require.NoError(t, err) - assert.NoError(t, acc.GatherError(logparser.Gather)) + require.NoError(t, acc.GatherError(logparser.Gather)) acc.Wait(1) logparser.Stop() @@ -170,7 +169,7 @@ func TestGrokParseLogFilesOneBad(t *testing.T) { acc := testutil.Accumulator{} acc.SetDebug(true) - assert.NoError(t, logparser.Start(&acc)) + require.NoError(t, logparser.Start(&acc)) acc.Wait(1) logparser.Stop() @@ -202,7 +201,7 @@ func TestGrokParseLogFiles_TimestampInEpochMilli(t *testing.T) { acc := testutil.Accumulator{} acc.SetDebug(true) - assert.NoError(t, logparser.Start(&acc)) + require.NoError(t, logparser.Start(&acc)) acc.Wait(1) logparser.Stop() diff --git a/plugins/inputs/logstash/logstash.go b/plugins/inputs/logstash/logstash.go index 6fcaadabcd244..9f5a198587e4d 100644 --- a/plugins/inputs/logstash/logstash.go +++ b/plugins/inputs/logstash/logstash.go @@ -179,8 +179,8 @@ func (logstash *Logstash) createHTTPClient() (*http.Client, error) { } // gatherJSONData query the data source and parse the response JSON -func (logstash *Logstash) gatherJSONData(url string, value interface{}) error { - request, err := http.NewRequest("GET", url, nil) +func (logstash *Logstash) gatherJSONData(address string, value interface{}) error { + request, err := http.NewRequest("GET", address, nil) if err != nil { return err } @@ -206,7 +206,7 @@ func (logstash *Logstash) gatherJSONData(url string, value interface{}) error { if response.StatusCode != http.StatusOK { // ignore the err here; LimitReader returns io.EOF and we're not interested in read errors. body, _ := io.ReadAll(io.LimitReader(response.Body, 200)) - return fmt.Errorf("%s returned HTTP status %s: %q", url, response.Status, body) + return fmt.Errorf("%s returned HTTP status %s: %q", address, response.Status, body) } err = json.NewDecoder(response.Body).Decode(value) @@ -218,10 +218,10 @@ func (logstash *Logstash) gatherJSONData(url string, value interface{}) error { } // gatherJVMStats gather the JVM metrics and add results to the accumulator -func (logstash *Logstash) gatherJVMStats(url string, accumulator telegraf.Accumulator) error { +func (logstash *Logstash) gatherJVMStats(address string, accumulator telegraf.Accumulator) error { jvmStats := &JVMStats{} - err := logstash.gatherJSONData(url, jvmStats) + err := logstash.gatherJSONData(address, jvmStats) if err != nil { return err } @@ -244,10 +244,10 @@ func (logstash *Logstash) gatherJVMStats(url string, accumulator telegraf.Accumu } // gatherJVMStats gather the Process metrics and add results to the accumulator -func (logstash *Logstash) gatherProcessStats(url string, accumulator telegraf.Accumulator) error { +func (logstash *Logstash) gatherProcessStats(address string, accumulator telegraf.Accumulator) error { processStats := &ProcessStats{} - err := logstash.gatherJSONData(url, processStats) + err := logstash.gatherJSONData(address, processStats) if err != nil { return err } @@ -403,10 +403,10 @@ func (logstash *Logstash) gatherQueueStats( } // gatherJVMStats gather the Pipeline metrics and add results to the accumulator (for Logstash < 6) -func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.Accumulator) error { +func (logstash *Logstash) gatherPipelineStats(address string, accumulator telegraf.Accumulator) error { pipelineStats := &PipelineStats{} - err := logstash.gatherJSONData(url, pipelineStats) + err := logstash.gatherJSONData(address, pipelineStats) if err != nil { return err } @@ -447,10 +447,10 @@ func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.A } // gatherJVMStats gather the Pipelines metrics and add results to the accumulator (for Logstash >= 6) -func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf.Accumulator) error { +func (logstash *Logstash) gatherPipelinesStats(address string, accumulator telegraf.Accumulator) error { pipelinesStats := &PipelinesStats{} - err := logstash.gatherJSONData(url, pipelinesStats) + err := logstash.gatherJSONData(address, pipelinesStats) if err != nil { return err } diff --git a/plugins/inputs/lustre2/lustre2_test.go b/plugins/inputs/lustre2/lustre2_test.go index 7fd3fd91f469e..3c5659e18f14f 100644 --- a/plugins/inputs/lustre2/lustre2_test.go +++ b/plugins/inputs/lustre2/lustre2_test.go @@ -7,11 +7,11 @@ import ( "os" "testing" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/testutil" "github.com/influxdata/toml" "github.com/influxdata/toml/ast" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // Set config file variables to point to fake directory structure instead of /proc? @@ -358,7 +358,7 @@ func TestLustre2CanParseConfiguration(t *testing.T) { require.NoError(t, toml.UnmarshalTable(lustre2.([]*ast.Table)[0], &plugin)) - assert.Equal(t, Lustre2{ + require.Equal(t, Lustre2{ OstProcfiles: []string{ "/proc/fs/lustre/obdfilter/*/stats", "/proc/fs/lustre/osd-ldiskfs/*/stats",