Skip to content

Commit

Permalink
Fix and improve error handling in mongodb collection stats (influxdat…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Aug 14, 2019
1 parent 4ab2981 commit 5e0c63f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 49 deletions.
4 changes: 4 additions & 0 deletions plugins/inputs/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ Telegraf logs similar to:
Error in input [mongodb]: not authorized on admin to execute command { serverStatus: 1, recordStats: 0 }
```

Some permission related errors are logged at debug level, you can check these
messages by setting `debug = true` in the agent section of the configuration or
by running Telegraf with the `--debug` argument.

### Metrics:

- mongodb
Expand Down
105 changes: 56 additions & 49 deletions plugins/inputs/mongodb/mongodb_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ func IsAuthorization(err error) bool {
return strings.Contains(err.Error(), "not authorized")
}

func authLogLevel(err error) string {
if IsAuthorization(err) {
return "D!"
} else {
return "E!"
}
}

func (s *Server) gatherOplogStats() *OplogStats {
stats := &OplogStats{}
localdb := s.Session.DB("local")
Expand All @@ -44,22 +52,14 @@ func (s *Server) gatherOplogStats() *OplogStats {
if err == mgo.ErrNotFound {
continue
}
if IsAuthorization(err) {
log.Println("D! Error getting first oplog entry (" + err.Error() + ")")
} else {
log.Println("E! Error getting first oplog entry (" + err.Error() + ")")
}
log.Printf("%s [inputs.mongodb] Error getting first oplog entry: %v", authLogLevel(err), err)
return stats
}
if err := localdb.C(collection_name).Find(query).Sort("-$natural").Limit(1).One(&op_last); err != nil {
if err == mgo.ErrNotFound || IsAuthorization(err) {
continue
}
if IsAuthorization(err) {
log.Println("D! Error getting first oplog entry (" + err.Error() + ")")
} else {
log.Println("E! Error getting first oplog entry (" + err.Error() + ")")
}
log.Printf("%s [inputs.mongodb] Error getting first oplog entry: %v", authLogLevel(err), err)
return stats
}
}
Expand All @@ -70,6 +70,45 @@ func (s *Server) gatherOplogStats() *OplogStats {
return stats
}

func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error) {
names, err := s.Session.DatabaseNames()
if err != nil {
return nil, err
}

results := &ColStats{}
for _, db_name := range names {
if stringInSlice(db_name, colStatsDbs) || len(colStatsDbs) == 0 {
var colls []string
colls, err = s.Session.DB(db_name).CollectionNames()
if err != nil {
log.Printf("E! [inputs.mongodb] Error getting collection names: %v", err)
continue
}
for _, col_name := range colls {
col_stat_line := &ColStatsData{}
err = s.Session.DB(db_name).Run(bson.D{
{
Name: "collStats",
Value: col_name,
},
}, col_stat_line)
if err != nil {
log.Printf("%s [inputs.mongodb] Error getting col stats from %q: %v", authLogLevel(err), col_name, err)
continue
}
collection := &Collection{
Name: col_name,
DbName: db_name,
ColStatsData: col_stat_line,
}
results.Collections = append(results.Collections, *collection)
}
}
}
return results, nil
}

func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gatherColStats bool, colStatsDbs []string) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
Expand Down Expand Up @@ -112,9 +151,9 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gather
}, &resultShards)
if err != nil {
if IsAuthorization(err) {
log.Println("D! Error getting database shard stats (" + err.Error() + ")")
log.Printf("D! [inputs.mongodb] Error getting database shard stats: %v", err)
} else {
log.Println("E! Error getting database shard stats (" + err.Error() + ")")
log.Printf("E! [inputs.mongodb] Error getting database shard stats: %v", err)
}
}

Expand All @@ -125,7 +164,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gather
names := []string{}
names, err = s.Session.DatabaseNames()
if err != nil {
log.Println("E! Error getting database names (" + err.Error() + ")")
log.Printf("E! [inputs.mongodb] Error getting database names: %v", err)
}
for _, db_name := range names {
db_stat_line := &DbStatsData{}
Expand All @@ -136,7 +175,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gather
},
}, db_stat_line)
if err != nil {
log.Println("E! Error getting db stats from " + db_name + "(" + err.Error() + ")")
log.Printf("E! [inputs.mongodb] Error getting db stats from %q: %v", db_name, err)
}
db := &Db{
Name: db_name,
Expand All @@ -147,41 +186,9 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gather
}
}

result_col_stats := &ColStats{}
if gatherColStats == true {
names := []string{}
names, err = s.Session.DatabaseNames()
if err != nil {
log.Println("E! Error getting database names (" + err.Error() + ")")
}
for _, db_name := range names {
if stringInSlice(db_name, colStatsDbs) || len(colStatsDbs) == 0 {
var colls []string
colls, err = s.Session.DB(db_name).CollectionNames()
if err != nil {
log.Println("E! Error getting collection names (" + err.Error() + ")")
}
for _, col_name := range colls {
col_stat_line := &ColStatsData{}
err = s.Session.DB(db_name).Run(bson.D{
{
Name: "collStats",
Value: col_name,
},
}, col_stat_line)
if err != nil {
log.Println("E! Error getting col stats from " + col_name + "(" + err.Error() + ")")
continue
}
collection := &Collection{
Name: col_name,
DbName: db_name,
ColStatsData: col_stat_line,
}
result_col_stats.Collections = append(result_col_stats.Collections, *collection)
}
}
}
result_col_stats, err := s.gatherCollectionStats(colStatsDbs)
if err != nil {
return err
}

result := &MongoStatus{
Expand Down

0 comments on commit 5e0c63f

Please sign in to comment.