forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Kibana input plugin (influxdata#4585)
- Loading branch information
1 parent
c0d707e
commit 2b1af3b
Showing
5 changed files
with
559 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# Kibana input plugin | ||
|
||
The [kibana](https://www.elastic.co/) plugin queries Kibana status API to | ||
obtain the health status of Kibana and some useful metrics. | ||
|
||
This plugin has been tested and works on Kibana 6.x versions. | ||
|
||
### Configuration | ||
|
||
```toml | ||
[[inputs.kibana]] | ||
## specify a list of one or more Kibana servers | ||
servers = ["http://localhost:5601"] | ||
|
||
## Timeout for HTTP requests | ||
timeout = "5s" | ||
|
||
## HTTP Basic Auth credentials | ||
# username = "username" | ||
# password = "pa$$word" | ||
|
||
## Optional TLS Config | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
# insecure_skip_verify = false | ||
``` | ||
|
||
### Status mappings | ||
|
||
When reporting health (green/yellow/red), additional field `status_code` | ||
is reported. Field contains mapping from status:string to status_code:int | ||
with following rules: | ||
|
||
- `green` - 1 | ||
- `yellow` - 2 | ||
- `red` - 3 | ||
- `unknown` - 0 | ||
|
||
### Measurements & Fields | ||
|
||
- kibana | ||
- status_code: integer (1, 2, 3, 0) | ||
- heap_max_bytes: integer | ||
- heap_used_bytes: integer | ||
- uptime_ms: integer | ||
- response_time_avg_ms: float | ||
- response_time_max_ms: integer | ||
- concurrent_connections: integer | ||
- requests_per_sec: float | ||
|
||
### Tags | ||
|
||
- status (Kibana health: green, yellow, red) | ||
- name (Kibana reported name) | ||
- uuid (Kibana reported UUID) | ||
- version (Kibana version) | ||
- source (Kibana server hostname or IP) | ||
|
||
### Example Output | ||
|
||
kibana,host=myhost,name=my-kibana,source=localhost:5601,version=6.3.2 concurrent_connections=0i,heap_max_bytes=136478720i,heap_used_bytes=119231088i,response_time_avg_ms=0i,response_time_max_ms=0i,status="green",status_code=1i,uptime_ms=2187428019i 1534864502000000000 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,230 @@ | ||
package kibana | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"net/http" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/internal/tls" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
) | ||
|
||
const statusPath = "/api/status" | ||
|
||
type kibanaStatus struct { | ||
Name string `json:"name"` | ||
Version version `json:"version"` | ||
Status status `json:"status"` | ||
Metrics metrics `json:"metrics"` | ||
} | ||
|
||
type version struct { | ||
Number string `json:"number"` | ||
BuildHash string `json:"build_hash"` | ||
BuildNumber int `json:"build_number"` | ||
BuildSnapshot bool `json:"build_snapshot"` | ||
} | ||
|
||
type status struct { | ||
Overall overallStatus `json:"overall"` | ||
Statuses interface{} `json:"statuses"` | ||
} | ||
|
||
type overallStatus struct { | ||
State string `json:"state"` | ||
} | ||
|
||
type metrics struct { | ||
UptimeInMillis int64 `json:"uptime_in_millis"` | ||
ConcurrentConnections int64 `json:"concurrent_connections"` | ||
CollectionIntervalInMilles int64 `json:"collection_interval_in_millis"` | ||
ResponseTimes responseTimes `json:"response_times"` | ||
Process process `json:"process"` | ||
Requests requests `json:"requests"` | ||
} | ||
|
||
type responseTimes struct { | ||
AvgInMillis float64 `json:"avg_in_millis"` | ||
MaxInMillis int64 `json:"max_in_millis"` | ||
} | ||
|
||
type process struct { | ||
Mem mem `json:"mem"` | ||
} | ||
|
||
type requests struct { | ||
Total int64 `json:"total"` | ||
} | ||
|
||
type mem struct { | ||
HeapMaxInBytes int64 `json:"heap_max_in_bytes"` | ||
HeapUsedInBytes int64 `json:"heap_used_in_bytes"` | ||
} | ||
|
||
const sampleConfig = ` | ||
## specify a list of one or more Kibana servers | ||
servers = ["http://localhost:5601"] | ||
## Timeout for HTTP requests | ||
timeout = "5s" | ||
## HTTP Basic Auth credentials | ||
# username = "username" | ||
# password = "pa$$word" | ||
## Optional TLS Config | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
# insecure_skip_verify = false | ||
` | ||
|
||
type Kibana struct { | ||
Local bool | ||
Servers []string | ||
Username string | ||
Password string | ||
Timeout internal.Duration | ||
tls.ClientConfig | ||
|
||
client *http.Client | ||
} | ||
|
||
func NewKibana() *Kibana { | ||
return &Kibana{ | ||
Timeout: internal.Duration{Duration: time.Second * 5}, | ||
} | ||
} | ||
|
||
// perform status mapping | ||
func mapHealthStatusToCode(s string) int { | ||
switch strings.ToLower(s) { | ||
case "green": | ||
return 1 | ||
case "yellow": | ||
return 2 | ||
case "red": | ||
return 3 | ||
} | ||
return 0 | ||
} | ||
|
||
// SampleConfig returns sample configuration for this plugin. | ||
func (k *Kibana) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
// Description returns the plugin description. | ||
func (k *Kibana) Description() string { | ||
return "Read status information from one or more Kibana servers" | ||
} | ||
|
||
func (k *Kibana) Gather(acc telegraf.Accumulator) error { | ||
if k.client == nil { | ||
client, err := k.createHttpClient() | ||
|
||
if err != nil { | ||
return err | ||
} | ||
k.client = client | ||
} | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(len(k.Servers)) | ||
|
||
for _, serv := range k.Servers { | ||
go func(baseUrl string, acc telegraf.Accumulator) { | ||
defer wg.Done() | ||
if err := k.gatherKibanaStatus(baseUrl, acc); err != nil { | ||
acc.AddError(fmt.Errorf("[url=%s]: %s", baseUrl, err)) | ||
return | ||
} | ||
}(serv, acc) | ||
} | ||
|
||
wg.Wait() | ||
return nil | ||
} | ||
|
||
func (k *Kibana) createHttpClient() (*http.Client, error) { | ||
tlsCfg, err := k.ClientConfig.TLSConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
client := &http.Client{ | ||
Transport: &http.Transport{ | ||
TLSClientConfig: tlsCfg, | ||
}, | ||
Timeout: k.Timeout.Duration, | ||
} | ||
|
||
return client, nil | ||
} | ||
|
||
func (k *Kibana) gatherKibanaStatus(baseUrl string, acc telegraf.Accumulator) error { | ||
|
||
kibanaStatus := &kibanaStatus{} | ||
url := baseUrl + statusPath | ||
|
||
host, err := k.gatherJsonData(url, kibanaStatus) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
fields := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["name"] = kibanaStatus.Name | ||
tags["source"] = host | ||
tags["version"] = kibanaStatus.Version.Number | ||
tags["status"] = kibanaStatus.Status.Overall.State | ||
|
||
fields["status_code"] = mapHealthStatusToCode(kibanaStatus.Status.Overall.State) | ||
|
||
fields["uptime_ms"] = kibanaStatus.Metrics.UptimeInMillis | ||
fields["concurrent_connections"] = kibanaStatus.Metrics.ConcurrentConnections | ||
fields["heap_max_bytes"] = kibanaStatus.Metrics.Process.Mem.HeapMaxInBytes | ||
fields["heap_used_bytes"] = kibanaStatus.Metrics.Process.Mem.HeapUsedInBytes | ||
fields["response_time_avg_ms"] = kibanaStatus.Metrics.ResponseTimes.AvgInMillis | ||
fields["response_time_max_ms"] = kibanaStatus.Metrics.ResponseTimes.MaxInMillis | ||
fields["requests_per_sec"] = float64(kibanaStatus.Metrics.Requests.Total) / float64(kibanaStatus.Metrics.CollectionIntervalInMilles) * 1000 | ||
|
||
acc.AddFields("kibana", fields, tags) | ||
|
||
return nil | ||
} | ||
|
||
func (k *Kibana) gatherJsonData(url string, v interface{}) (host string, err error) { | ||
|
||
request, err := http.NewRequest("GET", url, nil) | ||
|
||
if (k.Username != "") || (k.Password != "") { | ||
request.SetBasicAuth(k.Username, k.Password) | ||
} | ||
|
||
response, err := k.client.Do(request) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
defer response.Body.Close() | ||
|
||
if err = json.NewDecoder(response.Body).Decode(v); err != nil { | ||
return request.Host, err | ||
} | ||
|
||
return request.Host, nil | ||
} | ||
|
||
func init() { | ||
inputs.Add("kibana", func() telegraf.Input { | ||
return NewKibana() | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package kibana | ||
|
||
import ( | ||
"io/ioutil" | ||
"net/http" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/influxdata/telegraf/testutil" | ||
) | ||
|
||
func defaultTags() map[string]string { | ||
return map[string]string{ | ||
"name": "my-kibana", | ||
"source": "example.com:5601", | ||
"version": "6.3.2", | ||
"status": "green", | ||
} | ||
} | ||
|
||
type transportMock struct { | ||
statusCode int | ||
body string | ||
} | ||
|
||
func newTransportMock(statusCode int, body string) http.RoundTripper { | ||
return &transportMock{ | ||
statusCode: statusCode, | ||
body: body, | ||
} | ||
} | ||
|
||
func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) { | ||
res := &http.Response{ | ||
Header: make(http.Header), | ||
Request: r, | ||
StatusCode: t.statusCode, | ||
} | ||
res.Header.Set("Content-Type", "application/json") | ||
res.Body = ioutil.NopCloser(strings.NewReader(t.body)) | ||
return res, nil | ||
} | ||
|
||
func checkKibanaStatusResult(t *testing.T, acc *testutil.Accumulator) { | ||
tags := defaultTags() | ||
acc.AssertContainsTaggedFields(t, "kibana", kibanaStatusExpected, tags) | ||
} | ||
|
||
func TestGather(t *testing.T) { | ||
ks := newKibanahWithClient() | ||
ks.Servers = []string{"http://example.com:5601"} | ||
ks.client.Transport = newTransportMock(http.StatusOK, kibanaStatusResponse) | ||
|
||
var acc testutil.Accumulator | ||
if err := acc.GatherError(ks.Gather); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
checkKibanaStatusResult(t, &acc) | ||
} | ||
|
||
func newKibanahWithClient() *Kibana { | ||
ks := NewKibana() | ||
ks.client = &http.Client{} | ||
return ks | ||
} |
Oops, something went wrong.