diff --git a/plugins/inputs/mailchimp/chimp_api.go b/plugins/inputs/mailchimp/chimp_api.go index 2f6cecdb9e0da..71e7bcea6d535 100644 --- a/plugins/inputs/mailchimp/chimp_api.go +++ b/plugins/inputs/mailchimp/chimp_api.go @@ -5,12 +5,13 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" "net/url" "regexp" "sync" "time" + + "github.com/influxdata/telegraf" ) const ( @@ -22,11 +23,12 @@ var mailchimpDatacenter = regexp.MustCompile("[a-z]+[0-9]+$") type ChimpAPI struct { Transport http.RoundTripper - Debug bool + debug bool sync.Mutex url *url.URL + log telegraf.Logger } type ReportsParams struct { @@ -53,12 +55,12 @@ func (p *ReportsParams) String() string { return v.Encode() } -func NewChimpAPI(apiKey string) *ChimpAPI { +func NewChimpAPI(apiKey string, log telegraf.Logger) *ChimpAPI { u := &url.URL{} u.Scheme = "https" u.Host = fmt.Sprintf("%s.api.mailchimp.com", mailchimpDatacenter.FindString(apiKey)) u.User = url.UserPassword("", apiKey) - return &ChimpAPI{url: u} + return &ChimpAPI{url: u, log: log} } type APIError struct { @@ -90,7 +92,7 @@ func (a *ChimpAPI) GetReports(params ReportsParams) (ReportsResponse, error) { a.url.Path = reportsEndpoint var response ReportsResponse - rawjson, err := runChimp(a, params) + rawjson, err := a.runChimp(params) if err != nil { return response, err } @@ -109,7 +111,7 @@ func (a *ChimpAPI) GetReport(campaignID string) (Report, error) { a.url.Path = fmt.Sprintf(reportsEndpointCampaign, campaignID) var response Report - rawjson, err := runChimp(a, ReportsParams{}) + rawjson, err := a.runChimp(ReportsParams{}) if err != nil { return response, err } @@ -122,21 +124,21 @@ func (a *ChimpAPI) GetReport(campaignID string) (Report, error) { return response, nil } -func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) { +func (a *ChimpAPI) runChimp(params ReportsParams) ([]byte, error) { client := &http.Client{ - Transport: api.Transport, + Transport: a.Transport, Timeout: 4 * time.Second, } var b bytes.Buffer - req, err := http.NewRequest("GET", api.url.String(), &b) + req, err := http.NewRequest("GET", a.url.String(), &b) if err != nil { return nil, err } req.URL.RawQuery = params.String() req.Header.Set("User-Agent", "Telegraf-MailChimp-Plugin") - if api.Debug { - log.Printf("D! [inputs.mailchimp] request URL: %s", req.URL.String()) + if a.debug { + a.log.Debugf("request URL: %s", req.URL.String()) } resp, err := client.Do(req) @@ -148,15 +150,15 @@ func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) { if resp.StatusCode != http.StatusOK { // ignore the err here; LimitReader returns io.EOF and we're not interested in read errors. body, _ := io.ReadAll(io.LimitReader(resp.Body, 200)) - return nil, fmt.Errorf("%s returned HTTP status %s: %q", api.url.String(), resp.Status, body) + return nil, fmt.Errorf("%s returned HTTP status %s: %q", a.url.String(), resp.Status, body) } body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } - if api.Debug { - log.Printf("D! [inputs.mailchimp] response Body: %q", string(body)) + if a.debug { + a.log.Debugf("response Body: %q", string(body)) } if err = chimpErrorCheck(body); err != nil { diff --git a/plugins/inputs/mailchimp/mailchimp.go b/plugins/inputs/mailchimp/mailchimp.go index fe6892bf48743..b898cb6ba1768 100644 --- a/plugins/inputs/mailchimp/mailchimp.go +++ b/plugins/inputs/mailchimp/mailchimp.go @@ -14,6 +14,8 @@ type MailChimp struct { APIKey string `toml:"api_key"` DaysOld int `toml:"days_old"` CampaignID string `toml:"campaign_id"` + + Log telegraf.Logger `toml:"-"` } var sampleConfig = ` @@ -35,12 +37,13 @@ func (m *MailChimp) Description() string { return "Gathers metrics from the /3.0/reports MailChimp API" } -func (m *MailChimp) Gather(acc telegraf.Accumulator) error { - if m.api == nil { - m.api = NewChimpAPI(m.APIKey) - } - m.api.Debug = false +func (m *MailChimp) Init() error { + m.api = NewChimpAPI(m.APIKey, m.Log) + return nil +} + +func (m *MailChimp) Gather(acc telegraf.Accumulator) error { if m.CampaignID == "" { since := "" if m.DaysOld > 0 { diff --git a/plugins/inputs/mailchimp/mailchimp_test.go b/plugins/inputs/mailchimp/mailchimp_test.go index 1366d8859df5d..1df6c52cf6256 100644 --- a/plugins/inputs/mailchimp/mailchimp_test.go +++ b/plugins/inputs/mailchimp/mailchimp_test.go @@ -7,9 +7,9 @@ import ( "net/url" "testing" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) func TestMailChimpGatherReports(t *testing.T) { @@ -28,7 +28,8 @@ func TestMailChimpGatherReports(t *testing.T) { api := &ChimpAPI{ url: u, - Debug: true, + debug: true, + log: testutil.Logger{}, } m := MailChimp{ api: api, @@ -43,22 +44,22 @@ func TestMailChimpGatherReports(t *testing.T) { tags["campaign_title"] = "Freddie's Jokes Vol. 1" fields := map[string]interface{}{ - "emails_sent": int(200), - "abuse_reports": int(0), - "unsubscribed": int(2), - "hard_bounces": int(0), - "soft_bounces": int(2), - "syntax_errors": int(0), - "forwards_count": int(0), - "forwards_opens": int(0), - "opens_total": int(186), - "unique_opens": int(100), - "clicks_total": int(42), - "unique_clicks": int(400), - "unique_subscriber_clicks": int(42), - "facebook_recipient_likes": int(5), - "facebook_unique_likes": int(8), - "facebook_likes": int(42), + "emails_sent": 200, + "abuse_reports": 0, + "unsubscribed": 2, + "hard_bounces": 0, + "soft_bounces": 2, + "syntax_errors": 0, + "forwards_count": 0, + "forwards_opens": 0, + "opens_total": 186, + "unique_opens": 100, + "clicks_total": 42, + "unique_clicks": 400, + "unique_subscriber_clicks": 42, + "facebook_recipient_likes": 5, + "facebook_unique_likes": 8, + "facebook_likes": 42, "open_rate": float64(42), "click_rate": float64(42), "industry_open_rate": float64(0.17076777144396), @@ -92,7 +93,8 @@ func TestMailChimpGatherReport(t *testing.T) { api := &ChimpAPI{ url: u, - Debug: true, + debug: true, + log: testutil.Logger{}, } m := MailChimp{ api: api, @@ -157,7 +159,8 @@ func TestMailChimpGatherError(t *testing.T) { api := &ChimpAPI{ url: u, - Debug: true, + debug: true, + log: testutil.Logger{}, } m := MailChimp{ api: api, diff --git a/plugins/inputs/marklogic/marklogic.go b/plugins/inputs/marklogic/marklogic.go index d2ef139bfc7a3..30f9ee6403074 100644 --- a/plugins/inputs/marklogic/marklogic.go +++ b/plugins/inputs/marklogic/marklogic.go @@ -163,9 +163,9 @@ func (c *Marklogic) Gather(accumulator telegraf.Accumulator) error { return nil } -func (c *Marklogic) fetchAndInsertData(acc telegraf.Accumulator, url string) error { +func (c *Marklogic) fetchAndInsertData(acc telegraf.Accumulator, address string) error { ml := &MlHost{} - if err := c.gatherJSONData(url, ml); err != nil { + if err := c.gatherJSONData(address, ml); err != nil { return err } @@ -225,8 +225,8 @@ func (c *Marklogic) createHTTPClient() (*http.Client, error) { return client, nil } -func (c *Marklogic) gatherJSONData(url string, v interface{}) error { - req, err := http.NewRequest("GET", url, nil) +func (c *Marklogic) gatherJSONData(address string, v interface{}) error { + req, err := http.NewRequest("GET", address, nil) if err != nil { return err } diff --git a/plugins/inputs/mcrouter/mcrouter.go b/plugins/inputs/mcrouter/mcrouter.go index af197c3072089..07599ca2cc0b0 100644 --- a/plugins/inputs/mcrouter/mcrouter.go +++ b/plugins/inputs/mcrouter/mcrouter.go @@ -146,32 +146,33 @@ func (m *Mcrouter) Gather(acc telegraf.Accumulator) error { } // ParseAddress parses an address string into 'host:port' and 'protocol' parts -func (m *Mcrouter) ParseAddress(address string) (string, string, error) { - var protocol string +func (m *Mcrouter) ParseAddress(address string) (parsedAddress string, protocol string, err error) { var host string var port string - u, parseError := url.Parse(address) + parsedAddress = address + + u, parseError := url.Parse(parsedAddress) if parseError != nil { - return "", "", fmt.Errorf("Invalid server address") + return "", "", fmt.Errorf("invalid server address") } if u.Scheme != "tcp" && u.Scheme != "unix" { - return "", "", fmt.Errorf("Invalid server protocol") + return "", "", fmt.Errorf("invalid server protocol") } protocol = u.Scheme if protocol == "unix" { if u.Path == "" { - return "", "", fmt.Errorf("Invalid unix socket path") + return "", "", fmt.Errorf("invalid unix socket path") } - address = u.Path + parsedAddress = u.Path } else { if u.Host == "" { - return "", "", fmt.Errorf("Invalid host") + return "", "", fmt.Errorf("invalid host") } host = u.Hostname() @@ -185,10 +186,10 @@ func (m *Mcrouter) ParseAddress(address string) (string, string, error) { port = defaultServerURL.Port() } - address = host + ":" + port + parsedAddress = host + ":" + port } - return address, protocol, nil + return parsedAddress, protocol, nil } func (m *Mcrouter) gatherServer(ctx context.Context, address string, acc telegraf.Accumulator) error { diff --git a/plugins/inputs/mcrouter/mcrouter_test.go b/plugins/inputs/mcrouter/mcrouter_test.go index a9b525d46b79c..f02f2b53d4b85 100644 --- a/plugins/inputs/mcrouter/mcrouter_test.go +++ b/plugins/inputs/mcrouter/mcrouter_test.go @@ -5,9 +5,9 @@ import ( "strings" "testing" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) func TestAddressParsing(t *testing.T) { @@ -30,17 +30,17 @@ func TestAddressParsing(t *testing.T) { for _, args := range acceptTests { address, protocol, err := m.ParseAddress(args[0]) - assert.Nil(t, err, args[0]) - assert.True(t, address == args[1], args[0]) - assert.True(t, protocol == args[2], args[0]) + require.Nil(t, err, args[0]) + require.Equal(t, args[1], address, args[0]) + require.Equal(t, args[2], protocol, args[0]) } for _, addr := range rejectTests { address, protocol, err := m.ParseAddress(addr) - assert.NotNil(t, err, addr) - assert.Empty(t, address, addr) - assert.Empty(t, protocol, addr) + require.NotNil(t, err, addr) + require.Empty(t, address, addr) + require.Empty(t, protocol, addr) } } @@ -129,11 +129,11 @@ func TestMcrouterGeneratesMetricsIntegration(t *testing.T) { } for _, metric := range intMetrics { - assert.True(t, acc.HasInt64Field("mcrouter", metric), metric) + require.True(t, acc.HasInt64Field("mcrouter", metric), metric) } for _, metric := range floatMetrics { - assert.True(t, acc.HasFloatField("mcrouter", metric), metric) + require.True(t, acc.HasFloatField("mcrouter", metric), metric) } } diff --git a/plugins/inputs/mdstat/mdstat_test.go b/plugins/inputs/mdstat/mdstat_test.go index 070b7ddd234f5..27397f715ad0d 100644 --- a/plugins/inputs/mdstat/mdstat_test.go +++ b/plugins/inputs/mdstat/mdstat_test.go @@ -7,8 +7,9 @@ import ( "os" "testing" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" ) func TestFullMdstatProcFile(t *testing.T) { @@ -19,7 +20,7 @@ func TestFullMdstatProcFile(t *testing.T) { } acc := testutil.Accumulator{} err := k.Gather(&acc) - assert.NoError(t, err) + require.NoError(t, err) fields := map[string]interface{}{ "BlocksSynced": int64(10620027200), @@ -46,7 +47,7 @@ func TestFailedDiskMdStatProcFile1(t *testing.T) { acc := testutil.Accumulator{} err := k.Gather(&acc) - assert.NoError(t, err) + require.NoError(t, err) fields := map[string]interface{}{ "BlocksSynced": int64(5860144128), @@ -73,7 +74,7 @@ func TestEmptyMdStatProcFile1(t *testing.T) { acc := testutil.Accumulator{} err := k.Gather(&acc) - assert.NoError(t, err) + require.NoError(t, err) } func TestInvalidMdStatProcFile1(t *testing.T) { @@ -86,7 +87,7 @@ func TestInvalidMdStatProcFile1(t *testing.T) { acc := testutil.Accumulator{} err := k.Gather(&acc) - assert.Error(t, err) + require.Error(t, err) } const mdStatFileFull = ` diff --git a/plugins/inputs/memcached/memcached_test.go b/plugins/inputs/memcached/memcached_test.go index 1d0807625b31b..1ebfe65bad6fb 100644 --- a/plugins/inputs/memcached/memcached_test.go +++ b/plugins/inputs/memcached/memcached_test.go @@ -5,9 +5,9 @@ import ( "strings" "testing" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) func TestMemcachedGeneratesMetricsIntegration(t *testing.T) { @@ -32,7 +32,7 @@ func TestMemcachedGeneratesMetricsIntegration(t *testing.T) { "bytes_read", "bytes_written", "threads", "conn_yields"} for _, metric := range intMetrics { - assert.True(t, acc.HasInt64Field("memcached", metric), metric) + require.True(t, acc.HasInt64Field("memcached", metric), metric) } } diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index 68203c9d480cb..991f8a9fd7003 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "io" - "log" "net" "net/http" "net/url" @@ -23,7 +22,7 @@ type Role string const ( MASTER Role = "master" - SLAVE = "slave" + SLAVE Role = "slave" ) type Mesos struct { @@ -100,7 +99,7 @@ func (m *Mesos) Description() string { return "Telegraf plugin for gathering metrics from N Mesos masters" } -func parseURL(s string, role Role) (*url.URL, error) { +func (m *Mesos) parseURL(s string, role Role) (*url.URL, error) { if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") { host, port, err := net.SplitHostPort(s) // no port specified @@ -115,7 +114,7 @@ func parseURL(s string, role Role) (*url.URL, error) { } s = "http://" + host + ":" + port - log.Printf("W! [inputs.mesos] using %q as connection URL; please update your configuration to use an URL", s) + m.Log.Warnf("using %q as connection URL; please update your configuration to use an URL", s) } return url.Parse(s) @@ -139,7 +138,7 @@ func (m *Mesos) initialize() error { m.masterURLs = make([]*url.URL, 0, len(m.Masters)) for _, master := range m.Masters { - u, err := parseURL(master, MASTER) + u, err := m.parseURL(master, MASTER) if err != nil { return err } @@ -150,7 +149,7 @@ func (m *Mesos) initialize() error { m.slaveURLs = make([]*url.URL, 0, len(m.Slaves)) for _, slave := range m.Slaves { - u, err := parseURL(slave, SLAVE) + u, err := m.parseURL(slave, SLAVE) if err != nil { return err } @@ -241,11 +240,11 @@ func metricsDiff(role Role, w []string) []string { } // masterBlocks serves as kind of metrics registry grouping them in sets -func getMetrics(role Role, group string) []string { - m := make(map[string][]string) +func (m *Mesos) getMetrics(role Role, group string) []string { + metrics := make(map[string][]string) if role == MASTER { - m["resources"] = []string{ + metrics["resources"] = []string{ "master/cpus_percent", "master/cpus_used", "master/cpus_total", @@ -272,12 +271,12 @@ func getMetrics(role Role, group string) []string { "master/mem_revocable_used", } - m["master"] = []string{ + metrics["master"] = []string{ "master/elected", "master/uptime_secs", } - m["system"] = []string{ + metrics["system"] = []string{ "system/cpus_total", "system/load_15min", "system/load_5min", @@ -286,7 +285,7 @@ func getMetrics(role Role, group string) []string { "system/mem_total_bytes", } - m["agents"] = []string{ + metrics["agents"] = []string{ "master/slave_registrations", "master/slave_removals", "master/slave_reregistrations", @@ -303,7 +302,7 @@ func getMetrics(role Role, group string) []string { "master/slaves_unreachable", } - m["frameworks"] = []string{ + metrics["frameworks"] = []string{ "master/frameworks_active", "master/frameworks_connected", "master/frameworks_disconnected", @@ -314,10 +313,10 @@ func getMetrics(role Role, group string) []string { // framework_offers and allocator metrics have unpredictable names, so they can't be listed here. // These empty groups are included to prevent the "unknown metrics group" info log below. // filterMetrics() filters these metrics by looking for names with the corresponding prefix. - m["framework_offers"] = []string{} - m["allocator"] = []string{} + metrics["framework_offers"] = []string{} + metrics["allocator"] = []string{} - m["tasks"] = []string{ + metrics["tasks"] = []string{ "master/tasks_error", "master/tasks_failed", "master/tasks_finished", @@ -333,7 +332,7 @@ func getMetrics(role Role, group string) []string { "master/tasks_unreachable", } - m["messages"] = []string{ + metrics["messages"] = []string{ "master/invalid_executor_to_framework_messages", "master/invalid_framework_to_executor_messages", "master/invalid_status_update_acknowledgements", @@ -377,14 +376,14 @@ func getMetrics(role Role, group string) []string { "master/valid_operation_status_update_acknowledgements", } - m["evqueue"] = []string{ + metrics["evqueue"] = []string{ "master/event_queue_dispatches", "master/event_queue_http_requests", "master/event_queue_messages", "master/operator_event_stream_subscribers", } - m["registrar"] = []string{ + metrics["registrar"] = []string{ "registrar/state_fetch_ms", "registrar/state_store_ms", "registrar/state_store_ms/max", @@ -402,7 +401,7 @@ func getMetrics(role Role, group string) []string { "registrar/state_store_ms/count", } } else if role == SLAVE { - m["resources"] = []string{ + metrics["resources"] = []string{ "slave/cpus_percent", "slave/cpus_used", "slave/cpus_total", @@ -429,12 +428,12 @@ func getMetrics(role Role, group string) []string { "slave/mem_revocable_used", } - m["agent"] = []string{ + metrics["agent"] = []string{ "slave/registered", "slave/uptime_secs", } - m["system"] = []string{ + metrics["system"] = []string{ "system/cpus_total", "system/load_15min", "system/load_5min", @@ -443,7 +442,7 @@ func getMetrics(role Role, group string) []string { "system/mem_total_bytes", } - m["executors"] = []string{ + metrics["executors"] = []string{ "containerizer/mesos/container_destroy_errors", "slave/container_launch_errors", "slave/executors_preempted", @@ -456,7 +455,7 @@ func getMetrics(role Role, group string) []string { "slave/recovery_errors", } - m["tasks"] = []string{ + metrics["tasks"] = []string{ "slave/tasks_failed", "slave/tasks_finished", "slave/tasks_killed", @@ -466,7 +465,7 @@ func getMetrics(role Role, group string) []string { "slave/tasks_starting", } - m["messages"] = []string{ + metrics["messages"] = []string{ "slave/invalid_framework_messages", "slave/invalid_status_updates", "slave/valid_framework_messages", @@ -474,10 +473,10 @@ func getMetrics(role Role, group string) []string { } } - ret, ok := m[group] + ret, ok := metrics[group] if !ok { - log.Printf("I! [inputs.mesos] unknown role %q metrics group: %s", role, group) + m.Log.Infof("unknown role %q metrics group: %s", role, group) return []string{} } @@ -512,7 +511,7 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) { // All other metrics have predictable names. We can use getMetrics() to retrieve them. default: - for _, v := range getMetrics(role, k) { + for _, v := range m.getMetrics(role, k) { if _, ok = (*metrics)[v]; ok { delete(*metrics, v) } diff --git a/plugins/inputs/mesos/mesos_test.go b/plugins/inputs/mesos/mesos_test.go index 4b6d5ab74d371..2605ddd4678c2 100644 --- a/plugins/inputs/mesos/mesos_test.go +++ b/plugins/inputs/mesos/mesos_test.go @@ -10,8 +10,9 @@ import ( "strings" "testing" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) var masterMetrics map[string]interface{} @@ -340,7 +341,7 @@ func TestMasterFilter(t *testing.T) { // Assert expected metrics are present. for _, v := range m.MasterCols { - for _, x := range getMetrics(MASTER, v) { + for _, x := range m.getMetrics(MASTER, v) { _, ok := masterMetrics[x] require.Truef(t, ok, "Didn't find key %s, it should present.", x) } @@ -357,7 +358,7 @@ func TestMasterFilter(t *testing.T) { // Assert unexpected metrics are not present. for _, v := range b { - for _, x := range getMetrics(MASTER, v) { + for _, x := range m.getMetrics(MASTER, v) { _, ok := masterMetrics[x] require.Falsef(t, ok, "Found key %s, it should be gone.", x) } @@ -402,13 +403,13 @@ func TestSlaveFilter(t *testing.T) { m.filterMetrics(SLAVE, &slaveMetrics) for _, v := range b { - for _, x := range getMetrics(SLAVE, v) { + for _, x := range m.getMetrics(SLAVE, v) { _, ok := slaveMetrics[x] require.Falsef(t, ok, "Found key %s, it should be gone.", x) } } for _, v := range m.MasterCols { - for _, x := range getMetrics(SLAVE, v) { + for _, x := range m.getMetrics(SLAVE, v) { _, ok := slaveMetrics[x] require.Truef(t, ok, "Didn't find key %s, it should present.", x) } diff --git a/plugins/inputs/minecraft/client.go b/plugins/inputs/minecraft/client.go index 641a8ae75db9f..4aa712d4b04f4 100644 --- a/plugins/inputs/minecraft/client.go +++ b/plugins/inputs/minecraft/client.go @@ -45,17 +45,17 @@ func (c *connector) Connect() (Connection, error) { return nil, err } - rcon, err := rcon.NewClient(c.hostname, p) + client, err := rcon.NewClient(c.hostname, p) if err != nil { return nil, err } - _, err = rcon.Authorize(c.password) + _, err = client.Authorize(c.password) if err != nil { return nil, err } - return &connection{rcon: rcon}, nil + return &connection{client: client}, nil } func newClient(connector Connector) *client { @@ -111,11 +111,11 @@ func (c *client) Scores(player string) ([]Score, error) { } type connection struct { - rcon *rcon.Client + client *rcon.Client } func (c *connection) Execute(command string) (string, error) { - packet, err := c.rcon.Execute(command) + packet, err := c.client.Execute(command) if err != nil { return "", err } diff --git a/plugins/inputs/mongodb/mongodb_data_test.go b/plugins/inputs/mongodb/mongodb_data_test.go index 378268916054d..f7f891ec775bf 100644 --- a/plugins/inputs/mongodb/mongodb_data_test.go +++ b/plugins/inputs/mongodb/mongodb_data_test.go @@ -5,8 +5,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" ) var tags = make(map[string]string) @@ -65,7 +66,7 @@ func TestAddNonReplStats(t *testing.T) { d.flush(&acc) for key := range defaultStats { - assert.True(t, acc.HasFloatField("mongodb", key) || acc.HasInt64Field("mongodb", key), key) + require.True(t, acc.HasFloatField("mongodb", key) || acc.HasInt64Field("mongodb", key), key) } } @@ -86,7 +87,7 @@ func TestAddReplStats(t *testing.T) { d.flush(&acc) for key := range mmapStats { - assert.True(t, acc.HasInt64Field("mongodb", key), key) + require.True(t, acc.HasInt64Field("mongodb", key), key) } } @@ -120,14 +121,14 @@ func TestAddWiredTigerStats(t *testing.T) { d.flush(&acc) for key := range wiredTigerStats { - assert.True(t, acc.HasFloatField("mongodb", key), key) + require.True(t, acc.HasFloatField("mongodb", key), key) } for key := range wiredTigerExtStats { - assert.True(t, acc.HasFloatField("mongodb", key) || acc.HasInt64Field("mongodb", key), key) + require.True(t, acc.HasFloatField("mongodb", key) || acc.HasInt64Field("mongodb", key), key) } - assert.True(t, acc.HasInt64Field("mongodb", "page_faults")) + require.True(t, acc.HasInt64Field("mongodb", "page_faults")) } func TestAddShardStats(t *testing.T) { @@ -147,7 +148,7 @@ func TestAddShardStats(t *testing.T) { d.flush(&acc) for key := range defaultShardStats { - assert.True(t, acc.HasInt64Field("mongodb", key)) + require.True(t, acc.HasInt64Field("mongodb", key)) } } @@ -170,7 +171,7 @@ func TestAddLatencyStats(t *testing.T) { d.flush(&acc) for key := range defaultLatencyStats { - assert.True(t, acc.HasInt64Field("mongodb", key)) + require.True(t, acc.HasInt64Field("mongodb", key)) } } @@ -192,7 +193,7 @@ func TestAddAssertsStats(t *testing.T) { d.flush(&acc) for key := range defaultAssertsStats { - assert.True(t, acc.HasInt64Field("mongodb", key)) + require.True(t, acc.HasInt64Field("mongodb", key)) } } @@ -227,7 +228,7 @@ func TestAddCommandsStats(t *testing.T) { d.flush(&acc) for key := range defaultCommandsStats { - assert.True(t, acc.HasInt64Field("mongodb", key)) + require.True(t, acc.HasInt64Field("mongodb", key)) } } @@ -263,7 +264,7 @@ func TestAddTCMallocStats(t *testing.T) { d.flush(&acc) for key := range defaultTCMallocStats { - assert.True(t, acc.HasInt64Field("mongodb", key)) + require.True(t, acc.HasInt64Field("mongodb", key)) } } @@ -283,7 +284,7 @@ func TestAddStorageStats(t *testing.T) { d.flush(&acc) for key := range defaultStorageStats { - assert.True(t, acc.HasInt64Field("mongodb", key)) + require.True(t, acc.HasInt64Field("mongodb", key)) } } @@ -313,15 +314,15 @@ func TestAddShardHostStats(t *testing.T) { var hostsFound []string for host := range hostStatLines { for key := range shardHostStats { - assert.True(t, acc.HasInt64Field("mongodb_shard_stats", key)) + require.True(t, acc.HasInt64Field("mongodb_shard_stats", key)) } - assert.True(t, acc.HasTag("mongodb_shard_stats", "hostname")) + require.True(t, acc.HasTag("mongodb_shard_stats", "hostname")) hostsFound = append(hostsFound, host) } sort.Strings(hostsFound) sort.Strings(expectedHosts) - assert.Equal(t, hostsFound, expectedHosts) + require.Equal(t, hostsFound, expectedHosts) } func TestStateTag(t *testing.T) { @@ -527,7 +528,7 @@ func TestAddTopStats(t *testing.T) { for range topStatLines { for key := range topDataStats { - assert.True(t, acc.HasInt64Field("mongodb_top_stats", key)) + require.True(t, acc.HasInt64Field("mongodb_top_stats", key)) } } } diff --git a/plugins/inputs/mongodb/mongodb_server_test.go b/plugins/inputs/mongodb/mongodb_server_test.go index c8fd9f7c15284..d2313e4088f82 100644 --- a/plugins/inputs/mongodb/mongodb_server_test.go +++ b/plugins/inputs/mongodb/mongodb_server_test.go @@ -6,9 +6,9 @@ package mongodb import ( "testing" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) func TestGetDefaultTags(t *testing.T) { @@ -37,7 +37,7 @@ func TestAddDefaultStats(t *testing.T) { require.NoError(t, err) for key := range defaultStats { - assert.True(t, acc.HasInt64Field("mongodb", key)) + require.True(t, acc.HasInt64Field("mongodb", key)) } } diff --git a/plugins/inputs/mongodb/mongostat.go b/plugins/inputs/mongodb/mongostat.go index ea69c8d424f7c..2490ca2c1777c 100644 --- a/plugins/inputs/mongodb/mongostat.go +++ b/plugins/inputs/mongodb/mongostat.go @@ -903,7 +903,7 @@ func computeLockDiffs(prevLocks, curLocks map[string]LockUsage) []LockUsage { return lockUsages } -func diff(newVal, oldVal, sampleTime int64) (int64, int64) { +func diff(newVal, oldVal, sampleTime int64) (avg int64, newValue int64) { d := newVal - oldVal if d < 0 { d = newVal @@ -1311,10 +1311,10 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec // I'm the master returnVal.ReplLag = 0 break - } else { - // I'm secondary - me = member } + + // I'm secondary + me = member } else if member.State == 1 { // Master found master = member diff --git a/plugins/inputs/mongodb/mongostat_test.go b/plugins/inputs/mongodb/mongostat_test.go index 9f6ef04892ac9..908b82de1b911 100644 --- a/plugins/inputs/mongodb/mongostat_test.go +++ b/plugins/inputs/mongodb/mongostat_test.go @@ -2,10 +2,8 @@ package mongodb import ( "testing" - //"time" - //"github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestLatencyStats(t *testing.T) { @@ -55,12 +53,12 @@ func TestLatencyStats(t *testing.T) { 60, ) - assert.Equal(t, sl.CommandLatency, int64(0)) - assert.Equal(t, sl.ReadLatency, int64(0)) - assert.Equal(t, sl.WriteLatency, int64(0)) - assert.Equal(t, sl.CommandOpsCnt, int64(0)) - assert.Equal(t, sl.ReadOpsCnt, int64(0)) - assert.Equal(t, sl.WriteOpsCnt, int64(0)) + require.Equal(t, sl.CommandLatency, int64(0)) + require.Equal(t, sl.ReadLatency, int64(0)) + require.Equal(t, sl.WriteLatency, int64(0)) + require.Equal(t, sl.CommandOpsCnt, int64(0)) + require.Equal(t, sl.ReadOpsCnt, int64(0)) + require.Equal(t, sl.WriteOpsCnt, int64(0)) } func TestLatencyStatsDiffZero(t *testing.T) { @@ -124,12 +122,12 @@ func TestLatencyStatsDiffZero(t *testing.T) { 60, ) - assert.Equal(t, sl.CommandLatency, int64(0)) - assert.Equal(t, sl.ReadLatency, int64(0)) - assert.Equal(t, sl.WriteLatency, int64(0)) - assert.Equal(t, sl.CommandOpsCnt, int64(0)) - assert.Equal(t, sl.ReadOpsCnt, int64(0)) - assert.Equal(t, sl.WriteOpsCnt, int64(0)) + require.Equal(t, sl.CommandLatency, int64(0)) + require.Equal(t, sl.ReadLatency, int64(0)) + require.Equal(t, sl.WriteLatency, int64(0)) + require.Equal(t, sl.CommandOpsCnt, int64(0)) + require.Equal(t, sl.ReadOpsCnt, int64(0)) + require.Equal(t, sl.WriteOpsCnt, int64(0)) } func TestLatencyStatsDiff(t *testing.T) { @@ -193,10 +191,10 @@ func TestLatencyStatsDiff(t *testing.T) { 60, ) - assert.Equal(t, sl.CommandLatency, int64(59177981552)) - assert.Equal(t, sl.ReadLatency, int64(2255946760057)) - assert.Equal(t, sl.WriteLatency, int64(494479456987)) - assert.Equal(t, sl.CommandOpsCnt, int64(1019152861)) - assert.Equal(t, sl.ReadOpsCnt, int64(4189049884)) - assert.Equal(t, sl.WriteOpsCnt, int64(1691021287)) + require.Equal(t, sl.CommandLatency, int64(59177981552)) + require.Equal(t, sl.ReadLatency, int64(2255946760057)) + require.Equal(t, sl.WriteLatency, int64(494479456987)) + require.Equal(t, sl.CommandOpsCnt, int64(1019152861)) + require.Equal(t, sl.ReadOpsCnt, int64(4189049884)) + require.Equal(t, sl.WriteOpsCnt, int64(1691021287)) } diff --git a/plugins/inputs/monit/monit.go b/plugins/inputs/monit/monit.go index 1cb1a4ba57da9..051e0b36982fe 100644 --- a/plugins/inputs/monit/monit.go +++ b/plugins/inputs/monit/monit.go @@ -6,23 +6,24 @@ import ( "net/http" "time" + "golang.org/x/net/html/charset" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" - "golang.org/x/net/html/charset" ) const ( - fileSystem string = "0" - directory = "1" - file = "2" - process = "3" - remoteHost = "4" - system = "5" - fifo = "6" - program = "7" - network = "8" + fileSystem = "0" + directory = "1" + file = "2" + process = "3" + remoteHost = "4" + system = "5" + fifo = "6" + program = "7" + network = "8" ) var pendingActions = []string{"ignore", "alert", "restart", "stop", "exec", "unmonitor", "start", "monitor"} @@ -244,108 +245,109 @@ func (m *Monit) Gather(acc telegraf.Accumulator) error { } defer resp.Body.Close() - if resp.StatusCode == 200 { - var status Status - decoder := xml.NewDecoder(resp.Body) - decoder.CharsetReader = charset.NewReaderLabel - if err := decoder.Decode(&status); err != nil { - return fmt.Errorf("error parsing input: %v", err) - } + if resp.StatusCode != 200 { + return fmt.Errorf("received status code %d (%s), expected 200", resp.StatusCode, http.StatusText(resp.StatusCode)) + } - tags := map[string]string{ - "version": status.Server.Version, - "source": status.Server.LocalHostname, - "platform_name": status.Platform.Name, - } + var status Status + decoder := xml.NewDecoder(resp.Body) + decoder.CharsetReader = charset.NewReaderLabel + if err := decoder.Decode(&status); err != nil { + return fmt.Errorf("error parsing input: %v", err) + } + + tags := map[string]string{ + "version": status.Server.Version, + "source": status.Server.LocalHostname, + "platform_name": status.Platform.Name, + } - for _, service := range status.Services { - fields := make(map[string]interface{}) - tags["status"] = serviceStatus(service) - fields["status_code"] = service.Status - tags["pending_action"] = pendingAction(service) - fields["pending_action_code"] = service.PendingAction - tags["monitoring_status"] = monitoringStatus(service) - fields["monitoring_status_code"] = service.MonitoringStatus - tags["monitoring_mode"] = monitoringMode(service) - fields["monitoring_mode_code"] = service.MonitorMode - tags["service"] = service.Name - if service.Type == fileSystem { - fields["mode"] = service.Mode - fields["block_percent"] = service.Block.Percent - fields["block_usage"] = service.Block.Usage - fields["block_total"] = service.Block.Total - fields["inode_percent"] = service.Inode.Percent - fields["inode_usage"] = service.Inode.Usage - fields["inode_total"] = service.Inode.Total - acc.AddFields("monit_filesystem", fields, tags) - } else if service.Type == directory { - fields["mode"] = service.Mode - acc.AddFields("monit_directory", fields, tags) - } else if service.Type == file { - fields["size"] = service.Size - fields["mode"] = service.Mode - acc.AddFields("monit_file", fields, tags) - } else if service.Type == process { - fields["cpu_percent"] = service.CPU.Percent - fields["cpu_percent_total"] = service.CPU.PercentTotal - fields["mem_kb"] = service.Memory.Kilobyte - fields["mem_kb_total"] = service.Memory.KilobyteTotal - fields["mem_percent"] = service.Memory.Percent - fields["mem_percent_total"] = service.Memory.PercentTotal - fields["pid"] = service.Pid - fields["parent_pid"] = service.ParentPid - fields["threads"] = service.Threads - fields["children"] = service.Children - acc.AddFields("monit_process", fields, tags) - } else if service.Type == remoteHost { - fields["remote_hostname"] = service.Port.Hostname - fields["port_number"] = service.Port.PortNumber - fields["request"] = service.Port.Request - fields["response_time"] = service.Port.ResponseTime - fields["protocol"] = service.Port.Protocol - fields["type"] = service.Port.Type - acc.AddFields("monit_remote_host", fields, tags) - } else if service.Type == system { - fields["cpu_system"] = service.System.CPU.System - fields["cpu_user"] = service.System.CPU.User - fields["cpu_wait"] = service.System.CPU.Wait - fields["cpu_load_avg_1m"] = service.System.Load.Avg01 - fields["cpu_load_avg_5m"] = service.System.Load.Avg05 - fields["cpu_load_avg_15m"] = service.System.Load.Avg15 - fields["mem_kb"] = service.System.Memory.Kilobyte - fields["mem_percent"] = service.System.Memory.Percent - fields["swap_kb"] = service.System.Swap.Kilobyte - fields["swap_percent"] = service.System.Swap.Percent - acc.AddFields("monit_system", fields, tags) - } else if service.Type == fifo { - fields["mode"] = service.Mode - acc.AddFields("monit_fifo", fields, tags) - } else if service.Type == program { - fields["program_started"] = service.Program.Started * 10000000 - fields["program_status"] = service.Program.Status - acc.AddFields("monit_program", fields, tags) - } else if service.Type == network { - fields["link_state"] = service.Link.State - fields["link_speed"] = service.Link.Speed - fields["link_mode"] = linkMode(service) - fields["download_packets_now"] = service.Link.Download.Packets.Now - fields["download_packets_total"] = service.Link.Download.Packets.Total - fields["download_bytes_now"] = service.Link.Download.Bytes.Now - fields["download_bytes_total"] = service.Link.Download.Bytes.Total - fields["download_errors_now"] = service.Link.Download.Errors.Now - fields["download_errors_total"] = service.Link.Download.Errors.Total - fields["upload_packets_now"] = service.Link.Upload.Packets.Now - fields["upload_packets_total"] = service.Link.Upload.Packets.Total - fields["upload_bytes_now"] = service.Link.Upload.Bytes.Now - fields["upload_bytes_total"] = service.Link.Upload.Bytes.Total - fields["upload_errors_now"] = service.Link.Upload.Errors.Now - fields["upload_errors_total"] = service.Link.Upload.Errors.Total - acc.AddFields("monit_network", fields, tags) - } + for _, service := range status.Services { + fields := make(map[string]interface{}) + tags["status"] = serviceStatus(service) + fields["status_code"] = service.Status + tags["pending_action"] = pendingAction(service) + fields["pending_action_code"] = service.PendingAction + tags["monitoring_status"] = monitoringStatus(service) + fields["monitoring_status_code"] = service.MonitoringStatus + tags["monitoring_mode"] = monitoringMode(service) + fields["monitoring_mode_code"] = service.MonitorMode + tags["service"] = service.Name + if service.Type == fileSystem { + fields["mode"] = service.Mode + fields["block_percent"] = service.Block.Percent + fields["block_usage"] = service.Block.Usage + fields["block_total"] = service.Block.Total + fields["inode_percent"] = service.Inode.Percent + fields["inode_usage"] = service.Inode.Usage + fields["inode_total"] = service.Inode.Total + acc.AddFields("monit_filesystem", fields, tags) + } else if service.Type == directory { + fields["mode"] = service.Mode + acc.AddFields("monit_directory", fields, tags) + } else if service.Type == file { + fields["size"] = service.Size + fields["mode"] = service.Mode + acc.AddFields("monit_file", fields, tags) + } else if service.Type == process { + fields["cpu_percent"] = service.CPU.Percent + fields["cpu_percent_total"] = service.CPU.PercentTotal + fields["mem_kb"] = service.Memory.Kilobyte + fields["mem_kb_total"] = service.Memory.KilobyteTotal + fields["mem_percent"] = service.Memory.Percent + fields["mem_percent_total"] = service.Memory.PercentTotal + fields["pid"] = service.Pid + fields["parent_pid"] = service.ParentPid + fields["threads"] = service.Threads + fields["children"] = service.Children + acc.AddFields("monit_process", fields, tags) + } else if service.Type == remoteHost { + fields["remote_hostname"] = service.Port.Hostname + fields["port_number"] = service.Port.PortNumber + fields["request"] = service.Port.Request + fields["response_time"] = service.Port.ResponseTime + fields["protocol"] = service.Port.Protocol + fields["type"] = service.Port.Type + acc.AddFields("monit_remote_host", fields, tags) + } else if service.Type == system { + fields["cpu_system"] = service.System.CPU.System + fields["cpu_user"] = service.System.CPU.User + fields["cpu_wait"] = service.System.CPU.Wait + fields["cpu_load_avg_1m"] = service.System.Load.Avg01 + fields["cpu_load_avg_5m"] = service.System.Load.Avg05 + fields["cpu_load_avg_15m"] = service.System.Load.Avg15 + fields["mem_kb"] = service.System.Memory.Kilobyte + fields["mem_percent"] = service.System.Memory.Percent + fields["swap_kb"] = service.System.Swap.Kilobyte + fields["swap_percent"] = service.System.Swap.Percent + acc.AddFields("monit_system", fields, tags) + } else if service.Type == fifo { + fields["mode"] = service.Mode + acc.AddFields("monit_fifo", fields, tags) + } else if service.Type == program { + fields["program_started"] = service.Program.Started * 10000000 + fields["program_status"] = service.Program.Status + acc.AddFields("monit_program", fields, tags) + } else if service.Type == network { + fields["link_state"] = service.Link.State + fields["link_speed"] = service.Link.Speed + fields["link_mode"] = linkMode(service) + fields["download_packets_now"] = service.Link.Download.Packets.Now + fields["download_packets_total"] = service.Link.Download.Packets.Total + fields["download_bytes_now"] = service.Link.Download.Bytes.Now + fields["download_bytes_total"] = service.Link.Download.Bytes.Total + fields["download_errors_now"] = service.Link.Download.Errors.Now + fields["download_errors_total"] = service.Link.Download.Errors.Total + fields["upload_packets_now"] = service.Link.Upload.Packets.Now + fields["upload_packets_total"] = service.Link.Upload.Packets.Total + fields["upload_bytes_now"] = service.Link.Upload.Bytes.Now + fields["upload_bytes_total"] = service.Link.Upload.Bytes.Total + fields["upload_errors_now"] = service.Link.Upload.Errors.Now + fields["upload_errors_total"] = service.Link.Upload.Errors.Total + acc.AddFields("monit_network", fields, tags) } - } else { - return fmt.Errorf("received status code %d (%s), expected 200", resp.StatusCode, http.StatusText(resp.StatusCode)) } + return nil } diff --git a/plugins/inputs/monit/monit_test.go b/plugins/inputs/monit/monit_test.go index b3bbed79f68e1..ef47575e80b4c 100644 --- a/plugins/inputs/monit/monit_test.go +++ b/plugins/inputs/monit/monit_test.go @@ -8,10 +8,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type transportMock struct { @@ -632,7 +632,7 @@ func TestNoUsernameOrPasswordConfiguration(t *testing.T) { require.NoError(t, r.Init()) err := r.Gather(&acc) - assert.EqualError(t, err, "received status code 401 (Unauthorized), expected 200") + require.EqualError(t, err, "received status code 401 (Unauthorized), expected 200") } func TestInvalidXMLAndInvalidTypes(t *testing.T) { diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md index a9e8236ee0cf5..3fd128eb85e10 100644 --- a/plugins/inputs/mqtt_consumer/README.md +++ b/plugins/inputs/mqtt_consumer/README.md @@ -8,7 +8,7 @@ and creates metrics using one of the supported [input data formats][]. ```toml [[inputs.mqtt_consumer]] ## Broker URLs for the MQTT server or cluster. To connect to multiple - ## clusters or standalone servers, use a seperate plugin instance. + ## clusters or standalone servers, use a separate plugin instance. ## example: servers = ["tcp://localhost:1883"] ## servers = ["ssl://localhost:1883"] ## servers = ["ws://localhost:1883"] diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 815f27a727abf..3e88cecbbce45 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -9,6 +9,7 @@ import ( "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" @@ -64,15 +65,15 @@ type MQTTConsumer struct { Log telegraf.Logger - clientFactory ClientFactory - client Client - opts *mqtt.ClientOptions - acc telegraf.TrackingAccumulator - state ConnectionState - sem semaphore - messages map[telegraf.TrackingID]bool - messagesMutex sync.Mutex - topicTag string + clientFactory ClientFactory + client Client + opts *mqtt.ClientOptions + acc telegraf.TrackingAccumulator + state ConnectionState + sem semaphore + messages map[telegraf.TrackingID]bool + messagesMutex sync.Mutex + chosenTopicTag string ctx context.Context cancel context.CancelFunc @@ -80,7 +81,7 @@ type MQTTConsumer struct { var sampleConfig = ` ## Broker URLs for the MQTT server or cluster. To connect to multiple - ## clusters or standalone servers, use a seperate plugin instance. + ## clusters or standalone servers, use a separate plugin instance. ## example: servers = ["tcp://localhost:1883"] ## servers = ["ssl://localhost:1883"] ## servers = ["ws://localhost:1883"] @@ -174,9 +175,9 @@ func (m *MQTTConsumer) Init() error { return fmt.Errorf("connection_timeout must be greater than 1s: %s", time.Duration(m.ConnectionTimeout)) } - m.topicTag = "topic" + m.chosenTopicTag = "topic" if m.TopicTag != nil { - m.topicTag = *m.TopicTag + m.chosenTopicTag = *m.TopicTag } opts, err := m.createOpts() @@ -284,10 +285,10 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess return err } - if m.topicTag != "" { + if m.chosenTopicTag != "" { topic := msg.Topic() for _, metric := range metrics { - metric.AddTag(m.topicTag, topic) + metric.AddTag(m.chosenTopicTag, topic) } } diff --git a/plugins/inputs/multifile/multifile_test.go b/plugins/inputs/multifile/multifile_test.go index b12f29f35c2cd..214cebd136f9c 100644 --- a/plugins/inputs/multifile/multifile_test.go +++ b/plugins/inputs/multifile/multifile_test.go @@ -5,9 +5,9 @@ import ( "path" "testing" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) func TestFileTypes(t *testing.T) { @@ -32,8 +32,8 @@ func TestFileTypes(t *testing.T) { err := m.Gather(&acc) require.NoError(t, err) - assert.Equal(t, map[string]string{"exampletag": "test"}, acc.Metrics[0].Tags) - assert.Equal(t, map[string]interface{}{ + require.Equal(t, map[string]string{"exampletag": "test"}, acc.Metrics[0].Tags) + require.Equal(t, map[string]interface{}{ "examplebool": true, "examplestring": "hello world", "exampleint": int64(123456), @@ -60,7 +60,7 @@ func FailEarly(failEarly bool, t *testing.T) error { err := m.Gather(&acc) if err == nil { - assert.Equal(t, map[string]interface{}{ + require.Equal(t, map[string]interface{}{ "exampleint": int64(123456), }, acc.Metrics[0].Fields) } diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 6e81b3df2f757..28313b25534aa 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -10,6 +10,7 @@ import ( "time" "github.com/go-sql-driver/mysql" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" @@ -905,6 +906,7 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf. return err } defer rows.Close() + var ( command string state string @@ -948,6 +950,7 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf. if err != nil { return err } + defer connRows.Close() for connRows.Next() { var user string @@ -1812,90 +1815,100 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula } for _, database := range dbList { - rows, err := db.Query(fmt.Sprintf(tableSchemaQuery, database)) + err := m.gatherSchemaForDB(db, database, servtag, acc) if err != nil { return err } - defer rows.Close() - var ( - tableSchema string - tableName string - tableType string - engine string - version float64 - rowFormat string - tableRows float64 - dataLength float64 - indexLength float64 - dataFree float64 - createOptions string + } + return nil +} + +func (m *Mysql) gatherSchemaForDB(db *sql.DB, database string, servtag string, acc telegraf.Accumulator) error { + rows, err := db.Query(fmt.Sprintf(tableSchemaQuery, database)) + if err != nil { + return err + } + defer rows.Close() + + var ( + tableSchema string + tableName string + tableType string + engine string + version float64 + rowFormat string + tableRows float64 + dataLength float64 + indexLength float64 + dataFree float64 + createOptions string + ) + + for rows.Next() { + err = rows.Scan( + &tableSchema, + &tableName, + &tableType, + &engine, + &version, + &rowFormat, + &tableRows, + &dataLength, + &indexLength, + &dataFree, + &createOptions, ) - for rows.Next() { - err = rows.Scan( - &tableSchema, - &tableName, - &tableType, - &engine, - &version, - &rowFormat, - &tableRows, - &dataLength, - &indexLength, - &dataFree, - &createOptions, - ) - if err != nil { - return err - } - tags := map[string]string{"server": servtag} - tags["schema"] = tableSchema - tags["table"] = tableName - - if m.MetricVersion < 2 { - acc.AddFields(newNamespace("info_schema", "table_rows"), - map[string]interface{}{"value": tableRows}, tags) - - dlTags := copyTags(tags) - dlTags["component"] = "data_length" - acc.AddFields(newNamespace("info_schema", "table_size", "data_length"), - map[string]interface{}{"value": dataLength}, dlTags) - - ilTags := copyTags(tags) - ilTags["component"] = "index_length" - acc.AddFields(newNamespace("info_schema", "table_size", "index_length"), - map[string]interface{}{"value": indexLength}, ilTags) - - dfTags := copyTags(tags) - dfTags["component"] = "data_free" - acc.AddFields(newNamespace("info_schema", "table_size", "data_free"), - map[string]interface{}{"value": dataFree}, dfTags) - } else { - acc.AddFields("mysql_table_schema", - map[string]interface{}{"rows": tableRows}, tags) + if err != nil { + return err + } + tags := map[string]string{"server": servtag} + tags["schema"] = tableSchema + tags["table"] = tableName - acc.AddFields("mysql_table_schema", - map[string]interface{}{"data_length": dataLength}, tags) + if m.MetricVersion < 2 { + acc.AddFields(newNamespace("info_schema", "table_rows"), + map[string]interface{}{"value": tableRows}, tags) + + dlTags := copyTags(tags) + dlTags["component"] = "data_length" + acc.AddFields(newNamespace("info_schema", "table_size", "data_length"), + map[string]interface{}{"value": dataLength}, dlTags) + + ilTags := copyTags(tags) + ilTags["component"] = "index_length" + acc.AddFields(newNamespace("info_schema", "table_size", "index_length"), + map[string]interface{}{"value": indexLength}, ilTags) + + dfTags := copyTags(tags) + dfTags["component"] = "data_free" + acc.AddFields(newNamespace("info_schema", "table_size", "data_free"), + map[string]interface{}{"value": dataFree}, dfTags) + } else { + acc.AddFields("mysql_table_schema", + map[string]interface{}{"rows": tableRows}, tags) - acc.AddFields("mysql_table_schema", - map[string]interface{}{"index_length": indexLength}, tags) + acc.AddFields("mysql_table_schema", + map[string]interface{}{"data_length": dataLength}, tags) - acc.AddFields("mysql_table_schema", - map[string]interface{}{"data_free": dataFree}, tags) - } + acc.AddFields("mysql_table_schema", + map[string]interface{}{"index_length": indexLength}, tags) - versionTags := copyTags(tags) - versionTags["type"] = tableType - versionTags["engine"] = engine - versionTags["row_format"] = rowFormat - versionTags["create_options"] = createOptions + acc.AddFields("mysql_table_schema", + map[string]interface{}{"data_free": dataFree}, tags) + } - if m.MetricVersion < 2 { - acc.AddFields(newNamespace("info_schema", "table_version"), - map[string]interface{}{"value": version}, versionTags) - } else { - acc.AddFields("mysql_table_schema_version", - map[string]interface{}{"table_version": version}, versionTags) - } + versionTags := copyTags(tags) + versionTags["type"] = tableType + versionTags["engine"] = engine + versionTags["row_format"] = rowFormat + versionTags["create_options"] = createOptions + + if m.MetricVersion < 2 { + acc.AddFields(newNamespace("info_schema", "table_version"), + map[string]interface{}{"value": version}, versionTags) + } else { + acc.AddFields("mysql_table_schema_version", + map[string]interface{}{"table_version": version}, versionTags) } } return nil diff --git a/plugins/inputs/mysql/mysql_test.go b/plugins/inputs/mysql/mysql_test.go index 0cdcd4b1cd345..410f80213252f 100644 --- a/plugins/inputs/mysql/mysql_test.go +++ b/plugins/inputs/mysql/mysql_test.go @@ -5,9 +5,9 @@ import ( "fmt" "testing" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) func TestMysqlDefaultsToLocalIntegration(t *testing.T) { @@ -23,7 +23,7 @@ func TestMysqlDefaultsToLocalIntegration(t *testing.T) { err := m.Gather(&acc) require.NoError(t, err) - assert.True(t, acc.HasMeasurement("mysql")) + require.True(t, acc.HasMeasurement("mysql")) } func TestMysqlMultipleInstancesIntegration(t *testing.T) { @@ -43,9 +43,9 @@ func TestMysqlMultipleInstancesIntegration(t *testing.T) { var acc, acc2 testutil.Accumulator err := m.Gather(&acc) require.NoError(t, err) - assert.True(t, acc.HasMeasurement("mysql")) + require.True(t, acc.HasMeasurement("mysql")) // acc should have global variables - assert.True(t, acc.HasMeasurement("mysql_variables")) + require.True(t, acc.HasMeasurement("mysql_variables")) m2 := &Mysql{ Servers: []string{testServer}, @@ -53,9 +53,9 @@ func TestMysqlMultipleInstancesIntegration(t *testing.T) { } err = m2.Gather(&acc2) require.NoError(t, err) - assert.True(t, acc2.HasMeasurement("mysql")) + require.True(t, acc2.HasMeasurement("mysql")) // acc2 should not have global variables - assert.False(t, acc2.HasMeasurement("mysql_variables")) + require.False(t, acc2.HasMeasurement("mysql_variables")) } func TestMysqlMultipleInits(t *testing.T) { @@ -65,16 +65,16 @@ func TestMysqlMultipleInits(t *testing.T) { m2 := &Mysql{} m.InitMysql() - assert.True(t, m.initDone) - assert.False(t, m2.initDone) - assert.Equal(t, m.scanIntervalSlow, uint32(30)) - assert.Equal(t, m2.scanIntervalSlow, uint32(0)) + require.True(t, m.initDone) + require.False(t, m2.initDone) + require.Equal(t, m.scanIntervalSlow, uint32(30)) + require.Equal(t, m2.scanIntervalSlow, uint32(0)) m2.InitMysql() - assert.True(t, m.initDone) - assert.True(t, m2.initDone) - assert.Equal(t, m.scanIntervalSlow, uint32(30)) - assert.Equal(t, m2.scanIntervalSlow, uint32(0)) + require.True(t, m.initDone) + require.True(t, m2.initDone) + require.Equal(t, m.scanIntervalSlow, uint32(30)) + require.Equal(t, m2.scanIntervalSlow, uint32(0)) } func TestMysqlGetDSNTag(t *testing.T) {