Skip to content

Commit

Permalink
Extend logstash.node metricset for logstash_state stack monitoring da…
Browse files Browse the repository at this point in the history
…ta (#11506)

* WIP: metricset for logstash_state

* Remove debugging statement

* Exclude internal pipelines

* Index new docs only if pipelien ephemeral ID has changed

* Create one doc per ES cluster, if cluster_uuid(s) are given

* Add graph=true query param

* Extract cluster_uuids field so it's not indexed
  • Loading branch information
ycombinator authored May 1, 2019
1 parent adb3e12 commit bd2660d
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 5 deletions.
67 changes: 67 additions & 0 deletions metricbeat/module/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
package logstash

import (
"encoding/json"
"net/url"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
)

Expand All @@ -27,12 +33,73 @@ const ModuleName = "logstash"
// MetricSet can be used to build other metricsets within the Logstash module.
type MetricSet struct {
mb.BaseMetricSet
XPack bool
}

// PipelineState represents the state (shape) of a Logstash pipeline
type PipelineState struct {
ID string `json:"id"`
Hash string `json:"hash"`
EphemeralID string `json:"ephemeral_id"`
Graph map[string]interface{} `json:"graph,omitempty"`
Representation map[string]interface{} `json:"representation"`
BatchSize int `json:"batch_size"`
Workers int `json:"workers"`
ClusterIDs []string `json:"cluster_uuids,omitempty"` // TODO: see https://github.com/elastic/logstash/issues/10602
}

// NewMetricSet creates a metricset that can be used to build other metricsets
// within the Logstash module.
func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
config := struct {
XPack bool `config:"xpack.enabled"`
}{
XPack: false,
}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

return &MetricSet{
base,
config.XPack,
}, nil
}

// GetPipelines returns the list of pipelines running on a Logstash node
func GetPipelines(http *helper.HTTP, resetURI string) ([]PipelineState, error) {
content, err := fetchPath(http, resetURI, "_node/pipelines", "graph=true")
if err != nil {
return nil, errors.Wrap(err, "could not fetch node pipelines")
}

pipelinesResponse := struct {
Pipelines map[string]PipelineState `json:"pipelines"`
}{}

err = json.Unmarshal(content, &pipelinesResponse)
if err != nil {
return nil, errors.Wrap(err, "could not parse node pipelines response")
}

var pipelines []PipelineState
for pipelineID, pipeline := range pipelinesResponse.Pipelines {
pipeline.ID = pipelineID
pipelines = append(pipelines, pipeline)
}

return pipelines, nil
}

func fetchPath(http *helper.HTTP, resetURI, path string, query string) ([]byte, error) {
defer http.SetURI(resetURI)

// Parses the uri to replace the path
u, _ := url.Parse(resetURI)
u.Path = path
u.RawQuery = query

// Http helper includes the HostData with username and password
http.SetURI(u.String())
return http.FetchContent()
}
72 changes: 72 additions & 0 deletions metricbeat/module/logstash/node/data_xpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package node

import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/logstash"
)

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.PipelineState) error {
for _, pipeline := range pipelines {
// Exclude internal pipelines
if pipeline.ID[0] == '.' {
continue
}

// Rename key: graph -> representation
pipeline.Representation = pipeline.Graph
pipeline.Graph = nil

// Extract cluster_uuids
clusterUUIDs := pipeline.ClusterIDs
pipeline.ClusterIDs = nil

logstashState := map[string]logstash.PipelineState{
"pipeline": pipeline,
}

if pipeline.ClusterIDs == nil {
pipeline.ClusterIDs = []string{""}
}

for _, clusterUUID := range clusterUUIDs {
event := mb.Event{}
event.RootFields = common.MapStr{
"timestamp": common.Time(time.Now()),
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "logstash_state",
"logstash_state": logstashState,
}

if clusterUUID != "" {
event.RootFields["cluster_uuid"] = clusterUUID
}

event.ID = pipeline.EphemeralID
event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Logstash)
r.Event(event)
}
}

return nil
}
29 changes: 24 additions & 5 deletions metricbeat/module/logstash/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ func init() {
)
}

const (
nodePath = "/_node"
)

var (
hostParser = parse.URLHostParserBuilder{
DefaultScheme: "http",
PathConfigKey: "path",
DefaultPath: "_node",
DefaultPath: nodePath,
}.Build()
)

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
*logstash.MetricSet
http *helper.HTTP
*helper.HTTP
}

// New create a new instance of the MetricSet
Expand All @@ -69,10 +73,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
content, err := m.http.FetchContent()
if !m.MetricSet.XPack {
content, err := m.HTTP.FetchContent()
if err != nil {
return err
}

return eventMapping(r, content)
}

pipelinesContent, err := logstash.GetPipelines(m.HTTP, m.HostData().SanitizedURI+nodePath)
if err != nil {
m.Logger().Error(err)
return nil
}

err = eventMappingXPack(r, m, pipelinesContent)
if err != nil {
return err
m.Logger().Error(err)
}

return eventMapping(r, content)
return nil
}

0 comments on commit bd2660d

Please sign in to comment.