Skip to content

Commit

Permalink
Fix initialization of Logstash service URL (#17497) (#17592)
Browse files Browse the repository at this point in the history
* Fix initialization of Logstash service URL

* Adding CHANGELOG entry

* Expose Logstash HTTP API port in local environment

* Remove once field as it is no longer used

* Encapsulating more logic into method

* Debugging

* Compare cluster UUIDs

* Adding ES service

* Adding default LS pipeline that outputs to ES cluster

* Use beats input so LS pipeline (and node) keeps running

* Fixing up ES config in docker compose file

* Handle errors

* Fixing typo

* COPY pipeline file as part of image

* Refactoring code to make testable + Adding unit tests

* Adding comment for test

* Fixing URIs
  • Loading branch information
ycombinator authored Apr 8, 2020
1 parent f7bd38b commit c93902b
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix aws.s3.bucket.name terms_field in s3 overview dashboard. {pull}17542[17542]
- Fix Unix socket path in memcached. {pull}17512[17512]
- Fix vsphere VM dashboard host aggregation visualizations. {pull}17555[17555]
- Metricbeat no longer needs to be started strictly after Logstash for `logstash-xpack` module to report correct data. {issue}17261[17261] {pull}17497[17497]

*Packetbeat*

Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/logstash/_meta/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ ARG LOGSTASH_VERSION
FROM docker.elastic.co/logstash/logstash:${LOGSTASH_VERSION}

COPY healthcheck.sh /
COPY pipeline/logstash.conf /usr/share/logstash/pipeline/logstash.conf

ENV XPACK_MONITORING_ENABLED=FALSE
HEALTHCHECK --interval=1s --retries=300 CMD sh /healthcheck.sh
11 changes: 11 additions & 0 deletions metricbeat/module/logstash/_meta/pipeline/logstash.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
input {
beats {
port => 5044
}
}

output {
elasticsearch {
hosts => ["${ES_HOST:elasticsearch}:${ES_PORT:9200}"]
}
}
15 changes: 15 additions & 0 deletions metricbeat/module/logstash/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,18 @@ services:
LOGSTASH_VERSION: ${LOGSTASH_VERSION:-7.5.2}
ports:
- 9600
depends_on:
- elasticsearch

elasticsearch:
image: docker.elastic.co/integrations-ci/beats-elasticsearch:${ELASTICSEARCH_VERSION:-7.5.2}-1
build:
context: ../elasticsearch/_meta
args:
ELASTICSEARCH_VERSION: ${ELASTICSEARCH_VERSION:-7.5.2}
environment:
- "network.host="
- "transport.host=127.0.0.1"
- "http.host=0.0.0.0"
ports:
- 9200
28 changes: 25 additions & 3 deletions metricbeat/module/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package logstash_test

import (
"encoding/json"
"io/ioutil"
"net/http"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -68,15 +71,17 @@ func TestData(t *testing.T) {
}

func TestXPackEnabled(t *testing.T) {
service := compose.EnsureUpWithTimeout(t, 300, "logstash")
lsService := compose.EnsureUpWithTimeout(t, 300, "logstash")
esService := compose.EnsureUpWithTimeout(t, 300, "elasticsearch")

clusterUUID := getESClusterUUID(t, esService.Host())

metricSetToTypeMap := map[string]string{
"node": "logstash_state",
"node_stats": "logstash_stats",
}

config := getXPackConfig(service.Host())

config := getXPackConfig(lsService.Host())
metricSets := mbtest.NewReportingMetricSetV2Errors(t, config)
for _, metricSet := range metricSets {
events, errs := mbtest.ReportingFetchV2Error(metricSet)
Expand All @@ -85,6 +90,7 @@ func TestXPackEnabled(t *testing.T) {

event := events[0]
require.Equal(t, metricSetToTypeMap[metricSet.Name()], event.RootFields["type"])
require.Equal(t, clusterUUID, event.RootFields["cluster_uuid"])
require.Regexp(t, `^.monitoring-logstash-\d-mb`, event.Index)
}
}
Expand All @@ -105,3 +111,19 @@ func getXPackConfig(host string) map[string]interface{} {
"xpack.enabled": true,
}
}

func getESClusterUUID(t *testing.T, host string) string {
resp, err := http.Get("http://" + host + "/")
require.NoError(t, err)
defer resp.Body.Close()

var body struct {
ClusterUUID string `json:"cluster_uuid"`
}

data, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
json.Unmarshal(data, &body)

return body.ClusterUUID
}
47 changes: 32 additions & 15 deletions metricbeat/module/logstash/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package node_stats

import (
"sync"
"net/url"

"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand Down Expand Up @@ -50,7 +50,6 @@ var (
// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
*logstash.MetricSet
initialized sync.Once
}

// New create a new instance of the MetricSet
Expand All @@ -69,11 +68,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
var err error
m.initialized.Do(func() {
err = m.init()
})
if err != nil {
if err := m.updateServiceURI(); err != nil {
if m.XPack {
m.Logger().Error(err)
return nil
Expand Down Expand Up @@ -102,15 +97,37 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return nil
}

func (m *MetricSet) init() error {
if m.XPack {
err := m.CheckPipelineGraphAPIsAvailable()
if err != nil {
return err
}

m.HTTP.SetURI(m.HTTP.GetURI() + "?vertices=true")
func (m *MetricSet) updateServiceURI() error {
u, err := getServiceURI(m.GetURI(), m.XPack, m.CheckPipelineGraphAPIsAvailable)
if err != nil {
return err
}

m.HTTP.SetURI(u)
return nil

}

func getServiceURI(currURI string, xpackEnabled bool, graphAPIsAvailable func() error) (string, error) {
if !xpackEnabled {
// No need to request pipeline vertices from service API
return currURI, nil
}

if err := graphAPIsAvailable(); err != nil {
return "", err
}

u, err := url.Parse(currURI)
if err != nil {
return "", err
}

q := u.Query()
if q.Get("vertices") == "" {
q.Set("vertices", "true")
}

u.RawQuery = q.Encode()
return u.String(), nil
}
89 changes: 89 additions & 0 deletions metricbeat/module/logstash/node_stats/node_stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package node_stats

import (
"errors"
"testing"
"testing/quick"

"github.com/stretchr/testify/require"
)

func TestGetServiceURI(t *testing.T) {
tests := map[string]struct {
currURI string
xpackEnabled bool
graphAPIsAvailable func() error
expectedURI string
errExpected bool
}{
"xpack_disabled": {
currURI: "/_node/stats",
xpackEnabled: false,
graphAPIsAvailable: func() error { return nil },
expectedURI: "/_node/stats",
errExpected: false,
},
"apis_unavailable": {
currURI: "/_node/stats",
xpackEnabled: true,
graphAPIsAvailable: func() error { return errors.New("test") },
expectedURI: "",
errExpected: true,
},
"with_pipeline_vertices": {
currURI: "_node/stats",
xpackEnabled: true,
graphAPIsAvailable: func() error { return nil },
expectedURI: "/_node/stats?vertices=true",
errExpected: false,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
newURI, err := getServiceURI(nodeStatsPath, test.xpackEnabled, test.graphAPIsAvailable)
if test.errExpected {
require.Equal(t, "", newURI)
} else {
require.NoError(t, err)
require.Equal(t, test.expectedURI, newURI)
}
})
}
}

// See https://github.com/elastic/beats/issues/15974
func TestGetServiceURIMultipleCalls(t *testing.T) {
err := quick.Check(func(r uint) bool {
var err error
uri := "_node/stats"

numCalls := 2 + (r % 10) // between 2 and 11
for i := uint(0); i < numCalls; i++ {
uri, err = getServiceURI(uri, true, func() error { return nil })
if err != nil {
return false
}
}

return err == nil && uri == "_node/stats?vertices=true"
}, nil)
require.NoError(t, err)
}
1 change: 1 addition & 0 deletions testing/environments/local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ services:
ports:
- "127.0.0.1:5044:5044"
- "127.0.0.1:5055:5055"
- "127.0.0.1:9600:9600"
depends_on:
elasticsearch:
condition: service_healthy
Expand Down

0 comments on commit c93902b

Please sign in to comment.