Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add MongoDB plugin #54

Merged
merged 3 commits into from
Jul 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Telegraf currently has support for collecting metrics from:
* Redis
* RethinkDB
* Kafka
* MongoDB

We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API.

Expand Down
12 changes: 10 additions & 2 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ servers = ["localhost"]
# postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full]
# or a simple string:
# host=localhost user=pqotest password=... sslmode=...
#
#
# All connection parameters are optional. By default, the host is localhost
# and the user is the currently running user. For localhost, we default
# to sslmode=disable as well.
#
#

address = "sslmode=disable"

Expand All @@ -124,6 +124,14 @@ address = "sslmode=disable"
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]

[mongodb]
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]

# Read metrics about swap memory usage
[swap]
# no configuration
Expand Down
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package all
import (
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mongodb"
_ "github.com/influxdb/telegraf/plugins/mysql"
_ "github.com/influxdb/telegraf/plugins/postgresql"
_ "github.com/influxdb/telegraf/plugins/prometheus"
Expand Down
144 changes: 144 additions & 0 deletions plugins/mongodb/mongodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package mongodb

import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/url"
"sync"
"time"

"github.com/influxdb/telegraf/plugins"
"gopkg.in/mgo.v2"
)

type MongoDB struct {
Servers []string
Ssl Ssl
mongos map[string]*Server
}

type Ssl struct {
Enabled bool
CaCerts []string `toml:"cacerts"`
}

var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]`

func (m *MongoDB) SampleConfig() string {
return sampleConfig
}

func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}

var localhost = &url.URL{Host: "127.0.0.1:27017"}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc plugins.Accumulator) error {
if len(m.Servers) == 0 {
m.gatherServer(m.getMongoServer(localhost), acc)
return nil
}

var wg sync.WaitGroup

var outerr error

for _, serv := range m.Servers {
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
} else if u.Scheme == "" {
u.Scheme = "mongodb"
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
if u.Path == u.Host {
u.Path = ""
}
}
wg.Add(1)
go func() {
defer wg.Done()
outerr = m.gatherServer(m.getMongoServer(u), acc)
}()
}

wg.Wait()

return outerr
}

func (m *MongoDB) getMongoServer(url *url.URL) *Server {
if _, ok := m.mongos[url.Host]; !ok {
m.mongos[url.Host] = &Server{
Url: url,
}
}
return m.mongos[url.Host]
}

func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error {
if server.Session == nil {
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = time.Duration(10) * time.Second

if m.Ssl.Enabled {
tlsConfig := &tls.Config{}
if len(m.Ssl.CaCerts) > 0 {
roots := x509.NewCertPool()
for _, caCert := range m.Ssl.CaCerts {
ok := roots.AppendCertsFromPEM([]byte(caCert))
if !ok {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
} else {
tlsConfig.InsecureSkipVerify = true
}
dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) {
conn, err := tls.Dial("tcp", addr.String(), tlsConfig)
if err != nil {
fmt.Printf("error in Dial, %s\n", err.Error())
}
return conn, err
}
}

sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
fmt.Printf("error dialing over ssl, %s\n", err.Error())
return fmt.Errorf("Unable to connect to MongoDB, %s\n", err.Error())
}
server.Session = sess
}
return server.gatherData(acc)
}

func init() {
plugins.Add("mongodb", func() plugins.Plugin {
return &MongoDB{
mongos: make(map[string]*Server),
}
})
}
100 changes: 100 additions & 0 deletions plugins/mongodb/mongodb_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package mongodb

import (
"fmt"
"reflect"
"strconv"

"github.com/influxdb/telegraf/plugins"
)

type MongodbData struct {
StatLine *StatLine
Tags map[string]string
}

func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
if statLine.NodeType != "" && statLine.NodeType != "UNK" {
tags["state"] = statLine.NodeType
}
return &MongodbData{
StatLine: statLine,
Tags: tags,
}
}

var DefaultStats = map[string]string{
"inserts_per_sec": "Insert",
"queries_per_sec": "Query",
"updates_per_sec": "Update",
"deletes_per_sec": "Delete",
"getmores_per_sec": "GetMore",
"commands_per_sec": "Command",
"flushes_per_sec": "Flushes",
"vsize_megabytes": "Virtual",
"resident_megabytes": "Resident",
"queued_reads": "QueuedReaders",
"queued_writes": "QueuedWriters",
"active_reads": "ActiveReaders",
"active_writes": "ActiveWriters",
"net_in_bytes": "NetIn",
"net_out_bytes": "NetOut",
"open_connections": "NumConnections",
}

var DefaultReplStats = map[string]string{
"repl_inserts_per_sec": "InsertR",
"repl_queries_per_sec": "QueryR",
"repl_updates_per_sec": "UpdateR",
"repl_deletes_per_sec": "DeleteR",
"repl_getmores_per_sec": "GetMoreR",
"repl_commands_per_sec": "CommandR",
"member_status": "NodeType",
}

var MmapStats = map[string]string{
"mapped_megabytes": "Mapped",
"non-mapped_megabytes": "NonMapped",
"page_faults_per_sec": "Faults",
}

var WiredTigerStats = map[string]string{
"percent_cache_dirty": "CacheDirtyPercent",
"percent_cache_used": "CacheUsedPercent",
}

func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) {
statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(acc, statLine, DefaultStats)
if d.StatLine.NodeType != "" {
d.addStat(acc, statLine, DefaultReplStats)
}
if d.StatLine.StorageEngine == "mmapv1" {
d.addStat(acc, statLine, MmapStats)
} else if d.StatLine.StorageEngine == "wiredTiger" {
for key, value := range WiredTigerStats {
val := statLine.FieldByName(value).Interface()
percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
floatVal, _ := strconv.ParseFloat(percentVal, 64)
d.add(acc, key, floatVal)
}
}
}

func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) {
for key, value := range stats {
val := statLine.FieldByName(value).Interface()
d.add(acc, key, val)
}
}

func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
acc.AddValuesWithTime(
key,
map[string]interface{}{
"value": val,
},
d.Tags,
d.StatLine.Time,
)
}
Loading