Skip to content

Commit

Permalink
Add repl_oplog_window_s metric to mongodb input (influxdata#3964)
Browse files Browse the repository at this point in the history
  • Loading branch information
subuk authored and otherpirate committed Mar 15, 2019
1 parent 0e8b0e6 commit e68bd7f
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ and create a single measurement containing values e.g.
* ttl_deletes_per_sec
* ttl_passes_per_sec
* repl_lag
* repl_oplog_window_s
* jumbo_chunks (only if mongos or mongo config)

If gather_db_stats is set to true, it will also collect per database stats exposed by db.stats()
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/mongodb/mongodb_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var DefaultReplStats = map[string]string{
"member_status": "NodeType",
"state": "NodeState",
"repl_lag": "ReplLag",
"repl_oplog_window_s": "OplogTimeDiff",
}

var DefaultClusterStats = map[string]string{
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/mongodb/mongodb_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func TestStateTag(t *testing.T) {
"repl_queries_per_sec": int64(0),
"repl_updates_per_sec": int64(0),
"repl_lag": int64(0),
"repl_oplog_window_s": int64(0),
"resident_megabytes": int64(0),
"updates_per_sec": int64(0),
"vsize_megabytes": int64(0),
Expand Down
38 changes: 38 additions & 0 deletions plugins/inputs/mongodb/mongodb_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,41 @@ func (s *Server) getDefaultTags() map[string]string {
return tags
}

type oplogEntry struct {
Timestamp bson.MongoTimestamp `bson:"ts"`
}

func (s *Server) gatherOplogStats() *OplogStats {
stats := &OplogStats{}
localdb := s.Session.DB("local")

op_first := oplogEntry{}
op_last := oplogEntry{}
query := bson.M{"ts": bson.M{"$exists": true}}

for _, collection_name := range []string{"oplog.rs", "oplog.$main"} {
if err := localdb.C(collection_name).Find(query).Sort("$natural").Limit(1).One(&op_first); err != nil {
if err == mgo.ErrNotFound {
continue
}
log.Println("E! Error getting first oplog entry (" + err.Error() + ")")
return stats
}
if err := localdb.C(collection_name).Find(query).Sort("-$natural").Limit(1).One(&op_last); err != nil {
if err == mgo.ErrNotFound {
continue
}
log.Println("E! Error getting last oplog entry (" + err.Error() + ")")
return stats
}
}

op_first_time := time.Unix(int64(op_first.Timestamp>>32), 0)
op_last_time := time.Unix(int64(op_last.Timestamp>>32), 0)
stats.TimeDiff = int64(op_last_time.Sub(op_first_time).Seconds())
return stats
}

func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
Expand Down Expand Up @@ -66,6 +101,8 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error
log.Println("E! Error getting database shard stats (" + err.Error() + ")")
}

oplogStats := s.gatherOplogStats()

result_db_stats := &DbStats{}
if gatherDbStats == true {
names := []string{}
Expand Down Expand Up @@ -99,6 +136,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error
ClusterStatus: result_cluster,
DbStats: result_db_stats,
ShardStats: resultShards,
OplogStats: oplogStats,
}

defer func() {
Expand Down
8 changes: 8 additions & 0 deletions plugins/inputs/mongodb/mongostat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MongoStatus struct {
ClusterStatus *ClusterStatus
DbStats *DbStats
ShardStats *ShardStats
OplogStats *OplogStats
}

type ServerStatus struct {
Expand Down Expand Up @@ -102,6 +103,11 @@ type ReplSetStatus struct {
MyState int64 `bson:"myState"`
}

// OplogStatus stores information from getReplicationInfo
type OplogStats struct {
TimeDiff int64
}

// ReplSetMember stores information related to a replica set member
type ReplSetMember struct {
Name string `bson:"name"`
Expand Down Expand Up @@ -442,6 +448,7 @@ type StatLine struct {
// Replicated Opcounter fields
InsertR, QueryR, UpdateR, DeleteR, GetMoreR, CommandR int64
ReplLag int64
OplogTimeDiff int64
Flushes int64
Mapped, Virtual, Resident, NonMapped int64
Faults int64
Expand Down Expand Up @@ -772,6 +779,7 @@ func NewStatLine(oldMongo, newMongo MongoStatus, key string, all bool, sampleSec

newClusterStat := *newMongo.ClusterStatus
returnVal.JumboChunksCount = newClusterStat.JumboChunksCount
returnVal.OplogTimeDiff = newMongo.OplogStats.TimeDiff

newDbStats := *newMongo.DbStats
for _, db := range newDbStats.Dbs {
Expand Down

0 comments on commit e68bd7f

Please sign in to comment.