Skip to content

Commit

Permalink
Merge pull request #55 from brocaar/elasticsearch_plugin
Browse files Browse the repository at this point in the history
Implement Elasticsearch plugin
  • Loading branch information
evanphx committed Jul 21, 2015
2 parents 15ef627 + 22d4d1f commit 4ca39df
Show file tree
Hide file tree
Showing 5 changed files with 962 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Telegraf currently has support for collecting metrics from:
* Prometheus (client libraries and exporters)
* PostgreSQL
* Redis
* Elasticsearch
* RethinkDB
* Kafka

Expand Down
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package all

import (
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mysql"
Expand Down
154 changes: 154 additions & 0 deletions plugins/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"net/http"

"github.com/influxdb/telegraf/plugins"
)

const statsPath = "/_nodes/stats"
const statsPathLocal = "/_nodes/_local/stats"

type node struct {
Host string `json:"host"`
Name string `json:"name"`
Attributes map[string]string `json:"attributes"`
Indices interface{} `json:"indices"`
OS interface{} `json:"os"`
Process interface{} `json:"process"`
JVM interface{} `json:"jvm"`
ThreadPool interface{} `json:"thread_pool"`
Network interface{} `json:"network"`
FS interface{} `json:"fs"`
Transport interface{} `json:"transport"`
HTTP interface{} `json:"http"`
Breakers interface{} `json:"breakers"`
}

const sampleConfig = `
# specify a list of one or more Elasticsearch servers
servers = ["http://localhost:9200"]
# set local to false when you want to read the indices stats from all nodes
# within the cluster
local = true
`

// Elasticsearch is a plugin to read stats from one or many Elasticsearch
// servers.
type Elasticsearch struct {
Local bool
Servers []string
client *http.Client
}

// NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch() *Elasticsearch {
return &Elasticsearch{client: http.DefaultClient}
}

// SampleConfig returns sample configuration for this plugin.
func (e *Elasticsearch) SampleConfig() string {
return sampleConfig
}

// Description returns the plugin description.
func (e *Elasticsearch) Description() string {
return "Read stats from one or more Elasticsearch servers or clusters"
}

// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc plugins.Accumulator) error {
for _, serv := range e.Servers {
var url string
if e.Local {
url = serv + statsPathLocal
} else {
url = serv + statsPath
}
if err := e.gatherUrl(url, acc); err != nil {
return err
}
}
return nil
}

func (e *Elasticsearch) gatherUrl(url string, acc plugins.Accumulator) error {
r, err := e.client.Get(url)
if err != nil {
return err
}
if r.StatusCode != http.StatusOK {
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
}
d := json.NewDecoder(r.Body)
esRes := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*node `json:"nodes"`
}{}
if err = d.Decode(esRes); err != nil {
return err
}

for id, n := range esRes.Nodes {
tags := map[string]string{
"node_id": id,
"node_host": n.Host,
"node_name": n.Name,
"cluster_name": esRes.ClusterName,
}

for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
}

stats := map[string]interface{}{
"indices": n.Indices,
"os": n.OS,
"process": n.Process,
"jvm": n.JVM,
"thread_pool": n.ThreadPool,
"network": n.Network,
"fs": n.FS,
"transport": n.Transport,
"http": n.HTTP,
"breakers": n.Breakers,
}

for p, s := range stats {
if err := e.parseInterface(acc, p, tags, s); err != nil {
return err
}
}
}

return nil
}

func (e *Elasticsearch) parseInterface(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
if err := e.parseInterface(acc, prefix+"_"+k, tags, v); err != nil {
return err
}
}
case float64:
acc.Add(prefix, t, tags)
case bool, string, []interface{}:
// ignored types
return nil
default:
return fmt.Errorf("elasticsearch: got unexpected type %T with value %v (%s)", t, t, prefix)
}
return nil
}

func init() {
plugins.Add("elasticsearch", func() plugins.Plugin {
return NewElasticsearch()
})
}
72 changes: 72 additions & 0 deletions plugins/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package elasticsearch

import (
"io/ioutil"
"net/http"
"strings"
"testing"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
)

type transportMock struct {
statusCode int
body string
}

func newTransportMock(statusCode int, body string) http.RoundTripper {
return &transportMock{
statusCode: statusCode,
body: body,
}
}

func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
res := &http.Response{
Header: make(http.Header),
Request: r,
StatusCode: t.statusCode,
}
res.Header.Set("Content-Type", "application/json")
res.Body = ioutil.NopCloser(strings.NewReader(t.body))
return res, nil
}

func TestElasticsearch(t *testing.T) {
es := NewElasticsearch()
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(http.StatusOK, statsResponse)

var acc testutil.Accumulator
if err := es.Gather(&acc); err != nil {
t.Fatal(err)
}

tags := map[string]string{
"cluster_name": "es-testcluster",
"node_attribute_master": "true",
"node_id": "SDFsfSDFsdfFSDSDfSFDSDF",
"node_name": "test.host.com",
"node_host": "test",
}

testTables := []map[string]float64{
indicesExpected,
osExpected,
processExpected,
jvmExpected,
threadPoolExpected,
networkExpected,
fsExpected,
transportExpected,
httpExpected,
breakersExpected,
}

for _, testTable := range testTables {
for k, v := range testTable {
assert.NoError(t, acc.ValidateTaggedValue(k, v, tags))
}
}
}
Loading

0 comments on commit 4ca39df

Please sign in to comment.