Skip to content

Commit

Permalink
Add oplog timediff metric to mongodb input
Browse files Browse the repository at this point in the history
  • Loading branch information
Matvey Kruglov committed Apr 3, 2018
1 parent 19c102c commit dfb269c
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 6 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ and create a single measurement containing values e.g.
* ttl_passes_per_sec
* repl_lag
* jumbo_chunks (only if mongos or mongo config)
* oplog_timediff

If gather_db_stats is set to true, it will also collect per database stats exposed by db.stats()
creating another measurement called mongodb_db_stats and containing values:
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/mongodb/mongodb_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var DefaultReplStats = map[string]string{
"member_status": "NodeType",
"state": "NodeState",
"repl_lag": "ReplLag",
"oplog_timediff": "OplogTimeDiff",
}

var DefaultClusterStats = map[string]string{
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/mongodb/mongodb_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func TestStateTag(t *testing.T) {
"total_available": int64(0),
"total_created": int64(0),
"total_refreshing": int64(0),
"oplog_timediff": int64(0),
}
acc.AssertContainsTaggedFields(t, "mongodb", fields, stateTags)
}
71 changes: 65 additions & 6 deletions plugins/inputs/mongodb/mongodb_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,56 @@ func (s *Server) getDefaultTags() map[string]string {
return tags
}

type OplogStats struct {
TimeDiff int64
}

func (s *Server) gatherOplogStats() *OplogStats {
stats := &OplogStats{}
localdb := s.Session.DB("local")
oplog_collection_name := ""

collection_names, err := localdb.CollectionNames()
if err != nil {
log.Println("E! Error getting collection names of database 'local' (" + err.Error() + ")")
return stats
}

Loop:
for _, colname := range collection_names {
for _, needle_name := range []string{"oplog.rs", "oplog.$main"} {
if colname == needle_name {
oplog_collection_name = needle_name
break Loop
}
}
}

if oplog_collection_name == "" {
log.Println("E! Not oplog.rs nor oplog.$main collections found in 'local' db")
return stats
}
oplog := localdb.C(oplog_collection_name)
op_first := OplogEntry{}
op_last := OplogEntry{}

query := bson.M{"ts": bson.M{"$exists": true}}

if err := oplog.Find(query).Sort("$natural").Limit(1).One(&op_first); err != nil {
log.Println("E! Error getting first oplog entry (" + err.Error() + ")")
return stats
}

if err := oplog.Find(query).Sort("-$natural").Limit(1).One(&op_last); err != nil {
log.Println("E! Error getting last oplog entry (" + err.Error() + ")")
return stats
}
op_first_time := time.Unix(int64(op_first.Timestamp>>32), 0)
op_last_time := time.Unix(int64(op_last.Timestamp>>32), 0)
stats.TimeDiff = int64(op_last_time.Sub(op_first_time).Seconds())
return stats
}

func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
Expand Down Expand Up @@ -66,14 +116,22 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error
log.Println("E! Error getting database shard stats (" + err.Error() + ")")
}

db_names := []string{}
db_names, err = s.Session.DatabaseNames()
if err != nil {
log.Println("E! Error getting database names (" + err.Error() + ")")
}
oplogStats := &OplogStats{}
for _, name := range db_names {
if name == "local" {
oplogStats = s.gatherOplogStats()
break
}
}

result_db_stats := &DbStats{}
if gatherDbStats == true {
names := []string{}
names, err = s.Session.DatabaseNames()
if err != nil {
log.Println("E! Error getting database names (" + err.Error() + ")")
}
for _, db_name := range names {
for _, db_name := range db_names {
db_stat_line := &DbStatsData{}
err = s.Session.DB(db_name).Run(bson.D{
{
Expand All @@ -99,6 +157,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error
ClusterStatus: result_cluster,
DbStats: result_db_stats,
ShardStats: resultShards,
OplogStats: oplogStats,
}

defer func() {
Expand Down
9 changes: 9 additions & 0 deletions plugins/inputs/mongodb/mongostat.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sort"
"strings"
"time"

"gopkg.in/mgo.v2/bson"
)

const (
Expand All @@ -35,6 +37,11 @@ type MongoStatus struct {
ClusterStatus *ClusterStatus
DbStats *DbStats
ShardStats *ShardStats
OplogStats *OplogStats
}

type OplogEntry struct {
Timestamp bson.MongoTimestamp `bson:"ts"`
}

type ServerStatus struct {
Expand Down Expand Up @@ -442,6 +449,7 @@ type StatLine struct {
// Replicated Opcounter fields
InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64
ReplLag int64
OplogTimeDiff int64
Flushes int64
Mapped, Virtual, Resident, NonMapped int64
Faults int64
Expand Down Expand Up @@ -772,6 +780,7 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec

newClusterStat := *newMongo.ClusterStatus
returnVal.JumboChunksCount = newClusterStat.JumboChunksCount
returnVal.OplogTimeDiff = newMongo.OplogStats.TimeDiff

newDbStats := *newMongo.DbStats
for _, db := range newDbStats.Dbs {
Expand Down

0 comments on commit dfb269c

Please sign in to comment.