Skip to content

Basic Heartbeat Job validation #7587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package hbtest

import (
"io"
"net/http"
"net/url"
"strconv"

"net/http/httptest"

"github.com/elastic/beats/libbeat/common/mapval"
)

// HelloWorldBody is the body of the HelloWorldHandler.
const HelloWorldBody = "hello, world!"

// HelloWorldHandler is a handler for an http server that returns
// HelloWorldBody and a 200 OK status.
var HelloWorldHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
io.WriteString(w, HelloWorldBody)
})

// BadGatewayBody is the body of the BadGatewayHandler.
const BadGatewayBody = "Bad Gateway"

// BadGatewayHandler is a handler for an http server that returns
// BadGatewayBody and a 200 OK status.
var BadGatewayHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
io.WriteString(w, BadGatewayBody)
})

// ServerPort takes an httptest.Server and returns its port as a uint16.
func ServerPort(server *httptest.Server) (uint16, error) {
u, err := url.Parse(server.URL)
if err != nil {
return 0, err
}
p, err := strconv.Atoi(u.Port())
if err != nil {
return 0, err
}
return uint16(p), nil
}

// MonitorChecks creates a skima.Validator that represents the "monitor" field present
// in all heartbeat events.
func MonitorChecks(id string, host string, ip string, scheme string, status string) mapval.Validator {
return mapval.Schema(mapval.Map{
"monitor": mapval.Map{
// TODO: This is only optional because, for some reason, TCP returns
// this value, but HTTP does not. We should fix this
"host": mapval.Optional(mapval.IsEqual(host)),
"duration.us": mapval.IsDuration,
"id": id,
"ip": ip,
"scheme": scheme,
"status": status,
},
})
}

// TCPChecks creates a skima.Validator that represents the "tcp" field present
// in all heartbeat events that use a Tcp connection as part of their DialChain
func TCPChecks(port uint16) mapval.Validator {
return mapval.Schema(mapval.Map{
"tcp": mapval.Map{
"port": port,
"rtt.connect.us": mapval.IsDuration,
},
})
}
105 changes: 105 additions & 0 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package http

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/heartbeat/hbtest"
"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/mapval"
"github.com/elastic/beats/libbeat/testing/mapvaltest"
)

func checkServer(t *testing.T, handlerFunc http.HandlerFunc) (*httptest.Server, beat.Event) {
server := httptest.NewServer(handlerFunc)
defer server.Close()

config := common.NewConfig()
config.SetString("urls", 0, server.URL)

jobs, err := create(monitors.Info{}, config)
require.NoError(t, err)

job := jobs[0]

event, _, err := job.Run()
require.NoError(t, err)

return server, event
}

func httpChecks(urlStr string, statusCode int) mapval.Validator {
return mapval.Schema(mapval.Map{
"http": mapval.Map{
"url": urlStr,
"response.status_code": statusCode,
"rtt.content.us": mapval.IsDuration,
"rtt.response_header.us": mapval.IsDuration,
"rtt.total.us": mapval.IsDuration,
"rtt.validate.us": mapval.IsDuration,
"rtt.write_request.us": mapval.IsDuration,
},
})
}

func badGatewayChecks() mapval.Validator {
return mapval.Schema(mapval.Map{
"error": mapval.Map{
"message": "502 Bad Gateway",
"type": "validate",
},
})
}

func TestOKJob(t *testing.T) {
server, event := checkServer(t, hbtest.HelloWorldHandler)
port, err := hbtest.ServerPort(server)
require.NoError(t, err)

mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"),
hbtest.TCPChecks(port),
httpChecks(server.URL, http.StatusOK),
))(event.Fields),
)
}

func TestBadGatewayJob(t *testing.T) {
server, event := checkServer(t, hbtest.BadGatewayHandler)
port, err := hbtest.ServerPort(server)
require.NoError(t, err)

mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "down"),
hbtest.TCPChecks(port),
httpChecks(server.URL, http.StatusBadGateway),
badGatewayChecks(),
))(event.Fields),
)
}
48 changes: 36 additions & 12 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,41 +218,65 @@ func execPing(
body []byte,
timeout time.Duration,
validator func(*http.Response) error,
) (time.Time, time.Time, common.MapStr, reason.Reason) {
) (start, end time.Time, event common.MapStr, errReason reason.Reason) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

req = req.WithContext(ctx)
req = attachRequestBody(&ctx, req, body)
start, end, resp, errReason := execRequest(client, req, validator)

if errReason != nil {
if resp != nil {
return start, end, makeEvent(end.Sub(start), resp), errReason
}
return start, end, nil, errReason
}

event = makeEvent(end.Sub(start), resp)

return start, end, event, nil
}

func attachRequestBody(ctx *context.Context, req *http.Request, body []byte) *http.Request {
req = req.WithContext(*ctx)
if len(body) > 0 {
req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
req.ContentLength = int64(len(body))
}

start := time.Now()
return req
}

func execRequest(client *http.Client, req *http.Request, validator func(*http.Response) error) (start time.Time, end time.Time, resp *http.Response, errReason reason.Reason) {
start = time.Now()
resp, err := client.Do(req)
end := time.Now()
if resp != nil { // If above errors, the response will be nil
defer resp.Body.Close()
}
end = time.Now()

if err != nil {
return start, end, nil, reason.IOFailed(err)
}
defer resp.Body.Close()

err = validator(resp)
end = time.Now()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need a note form @urso on why we update the end time here.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The different network layers record round trip times. For http having a response doesn't mean the request if finished yet. It means we have seen and parsed the headers. All additional contents will be streamed. If the body is bigger + if the validator consumes all contents, we at least have some idea about the total time taken (the validator generates additional IO). This can definitely be improved, but as we create IO when consuming the body, we need to report durations.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the details.

if err != nil {
return start, end, resp, reason.ValidateFailed(err)
}

return start, end, resp, nil
}

rtt := end.Sub(start)
event := common.MapStr{"http": common.MapStr{
func makeEvent(rtt time.Duration, resp *http.Response) common.MapStr {
return common.MapStr{"http": common.MapStr{
"response": common.MapStr{
"status_code": resp.StatusCode,
},
"rtt": common.MapStr{
"total": look.RTT(rtt),
},
}}

if err != nil {
return start, end, event, reason.ValidateFailed(err)
}
return start, end, event, nil
}

func splitHostnamePort(requ *http.Request) (string, uint16, error) {
Expand Down
Loading