Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch MongoDB libraries. Update test data. #6

Merged
merged 10 commits into from
Apr 28, 2024
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.1
github.com/aws/aws-sdk-go-v2/service/ec2 v1.1.0
github.com/aws/smithy-go v1.0.0
github.com/beevik/ntp v0.3.0 // indirect
github.com/beevik/ntp v0.3.0
github.com/benbjohnson/clock v1.0.3
github.com/bitly/go-hostpool v0.1.0 // indirect
github.com/bmatcuk/doublestar/v3 v3.0.0
Expand Down Expand Up @@ -132,13 +132,14 @@ require (
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 // indirect
go.mongodb.org/mongo-driver v1.5.3
go.starlark.net v0.0.0-20210312235212-74c10e2c17dc
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa
golang.org/x/text v0.3.4
golang.org/x/text v0.3.5
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4
google.golang.org/api v0.20.0
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
Expand All @@ -147,7 +148,6 @@ require (
gopkg.in/fatih/pool.v2 v2.0.0 // indirect
gopkg.in/gorethink/gorethink.v3 v3.0.5
gopkg.in/ldap.v3 v3.1.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/olivere/elastic.v5 v5.0.70
gopkg.in/yaml.v2 v2.3.0
gotest.tools v2.2.0+incompatible
Expand Down
80 changes: 79 additions & 1 deletion go.sum

Large diffs are not rendered by default.

179 changes: 84 additions & 95 deletions plugins/inputs/mongodb/mongodb.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package mongodb

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/url"
"strings"
"sync"
Expand All @@ -13,20 +13,24 @@ import (
"github.com/influxdata/telegraf"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"gopkg.in/mgo.v2"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)

type MongoDB struct {
Servers []string
Ssl Ssl
mongos map[string]*Server
GatherClusterStatus bool
GatherPerdbStats bool
GatherColStats bool
GatherTopStat bool
ColStatsDbs []string
tlsint.ClientConfig

Log telegraf.Logger
Log telegraf.Logger `toml:"-"`

clients []*Server
}

type Ssl struct {
Expand All @@ -53,6 +57,10 @@ var sampleConfig = `
## When true, collect per collection stats
# gather_col_stats = false

## When true, collect usage statistics for each collection
## (insert, update, queries, remove, getmore, commands etc...).
# gather_top_stat = false

## List of db where collections stats are collected
## If empty, all db are concerned
# col_stats_dbs = ["local"]
Expand All @@ -73,126 +81,107 @@ func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}

var localhost = &url.URL{Host: "mongodb://127.0.0.1:27017"}
func (m *MongoDB) Init() error {
var tlsConfig *tls.Config
if m.Ssl.Enabled {
// Deprecated TLS config
tlsConfig = &tls.Config{
InsecureSkipVerify: m.ClientConfig.InsecureSkipVerify,
}
if len(m.Ssl.CaCerts) == 0 {
return fmt.Errorf("you must explicitly set insecure_skip_verify to skip cerificate validation")
}

roots := x509.NewCertPool()
for _, caCert := range m.Ssl.CaCerts {
if ok := roots.AppendCertsFromPEM([]byte(caCert)); !ok {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
} else {
var err error
tlsConfig, err = m.ClientConfig.TLSConfig()
if err != nil {
return err
}
}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 {
m.gatherServer(m.getMongoServer(localhost), acc)
return nil
m.Servers = []string{"mongodb://127.0.0.1:27017"}
}

var wg sync.WaitGroup
for i, serv := range m.Servers {
if !strings.HasPrefix(serv, "mongodb://") {
for _, connURL := range m.Servers {
if !strings.HasPrefix(connURL, "mongodb://") && !strings.HasPrefix(connURL, "mongodb+srv://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
serv = "mongodb://" + serv
m.Log.Warnf("Using %q as connection URL; please update your configuration to use an URL", serv)
m.Servers[i] = serv
connURL = "mongodb://" + connURL
m.Log.Warnf("Using %q as connection URL; please update your configuration to use an URL", connURL)
}

u, err := url.Parse(serv)
u, err := url.Parse(connURL)
if err != nil {
m.Log.Errorf("Unable to parse address %q: %s", serv, err.Error())
continue
return fmt.Errorf("unable to parse connection URL: %q", err)
}
if u.Host == "" {
m.Log.Errorf("Unable to parse address %q", serv)
continue
}

wg.Add(1)
go func(srv *Server) {
defer wg.Done()
err := m.gatherServer(srv, acc)
if err != nil {
m.Log.Errorf("Error in plugin: %v", err)
}
}(m.getMongoServer(u))
}

wg.Wait()
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() //nolint:revive

func (m *MongoDB) getMongoServer(url *url.URL) *Server {
if _, ok := m.mongos[url.Host]; !ok {
m.mongos[url.Host] = &Server{
Log: m.Log,
URL: url,
opts := options.Client().ApplyURI(connURL)
if tlsConfig != nil {
opts.TLSConfig = tlsConfig
}
}
return m.mongos[url.Host]
}

func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
if server.Session == nil {
var dialAddrs []string
if server.URL.User != nil {
dialAddrs = []string{server.URL.String()}
} else {
dialAddrs = []string{server.URL.Host}
if opts.ReadPreference == nil {
opts.ReadPreference = readpref.Nearest()
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])

client, err := mongo.Connect(ctx, opts)
if err != nil {
return fmt.Errorf("unable to parse URL %q: %s", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = 5 * time.Second

var tlsConfig *tls.Config

if m.Ssl.Enabled {
// Deprecated TLS config
tlsConfig = &tls.Config{}
if len(m.Ssl.CaCerts) > 0 {
roots := x509.NewCertPool()
for _, caCert := range m.Ssl.CaCerts {
ok := roots.AppendCertsFromPEM([]byte(caCert))
if !ok {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
} else {
tlsConfig.InsecureSkipVerify = true
}
} else {
tlsConfig, err = m.ClientConfig.TLSConfig()
if err != nil {
return err
}
return fmt.Errorf("unable to connect to MongoDB: %q", err)
}

// If configured to use TLS, add a dial function
if tlsConfig != nil {
dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) {
conn, err := tls.Dial("tcp", addr.String(), tlsConfig)
if err != nil {
fmt.Printf("error in Dial, %s\n", err.Error())
}
return conn, err
}
err = client.Ping(ctx, opts.ReadPreference)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %s", err)
}

sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB(%s): %s", strings.Join(dialInfo.Addrs, ","), err.Error())
server := &Server{
client: client,
hostname: u.Host,
Log: m.Log,
}
server.Session = sess
m.clients = append(m.clients, server)
}
return server.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.ColStatsDbs)

return nil
}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, client := range m.clients {
wg.Add(1)
go func(srv *Server) {
defer wg.Done()
err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs)
if err != nil {
m.Log.Errorf("failed to gather data: %q", err)
}
}(client)
}

wg.Wait()
return nil
}

func init() {
inputs.Add("mongodb", func() telegraf.Input {
return &MongoDB{
mongos: make(map[string]*Server),
GatherClusterStatus: true,
GatherPerdbStats: false,
GatherColStats: false,
GatherTopStat: false,
ColStatsDbs: []string{"local"},
}
})
Expand Down
Loading