Skip to content

Commit ff4f736

Browse files
committed
Heartbeat Job Validation + addition of libbeat/mapval
This commit seeks to establish a pattern for testing heartbeat jobs. It currently tests the HTTP and TCP jobs. It also required some minor refactors of those tasks for HTTP/TCP. To do this, it made sense to validate event maps with a sort of schema library. I couldn't find one that did exactly what I wanted here, so I wrote one called mapval. That turned out to be a large undertaking, and is now the majority of this commit. Further tests need to be written, but this commit is large enough as is. One of the nicest things about the heartbeat architecture is the dialer chain behavior. It should be the case that any validated protocol using TCP (e.g. HTTP, TCP, Redis, etc.) has the exact same tcp metadata. To help make testing these properties easy mapval lets users compose portions of a schema into a bigger one. In other words, you can say "An HTTP response should be a TCP response, with the standard monitor data added in, and also the special HTTP fields". Even having only written a handful of tests this has uncovered some inconsistencies there, where TCP jobs have a hostname, but HTTP ones do not.
1 parent 5eaafff commit ff4f736

File tree

21 files changed

+3413
-33
lines changed

21 files changed

+3413
-33
lines changed

heartbeat/hbtest/hbtestutil.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package hbtest
19+
20+
import (
21+
"io"
22+
"net/http"
23+
"net/url"
24+
"strconv"
25+
26+
"net/http/httptest"
27+
28+
"github.com/elastic/beats/libbeat/common/mapval"
29+
)
30+
31+
// HelloWorldBody is the body of the HelloWorldHandler.
32+
const HelloWorldBody = "hello, world!"
33+
34+
// HelloWorldHandler is a handler for an http server that returns
35+
// HelloWorldBody and a 200 OK status.
36+
var HelloWorldHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
37+
w.WriteHeader(http.StatusOK)
38+
io.WriteString(w, HelloWorldBody)
39+
})
40+
41+
// BadGatewayBody is the body of the BadGatewayHandler.
42+
const BadGatewayBody = "Bad Gateway"
43+
44+
// BadGatewayHandler is a handler for an http server that returns
45+
// BadGatewayBody and a 200 OK status.
46+
var BadGatewayHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
47+
w.WriteHeader(http.StatusBadGateway)
48+
io.WriteString(w, BadGatewayBody)
49+
})
50+
51+
// ServerPort takes an httptest.Server and returns its port as a uint16.
52+
func ServerPort(server *httptest.Server) (uint16, error) {
53+
u, err := url.Parse(server.URL)
54+
if err != nil {
55+
return 0, err
56+
}
57+
p, err := strconv.Atoi(u.Port())
58+
if err != nil {
59+
return 0, err
60+
}
61+
return uint16(p), nil
62+
}
63+
64+
// MonitorChecks creates a skima.Validator that represents the "monitor" field present
65+
// in all heartbeat events.
66+
func MonitorChecks(id string, host string, ip string, scheme string, status string) mapval.Validator {
67+
return mapval.Schema(mapval.Map{
68+
"monitor": mapval.Map{
69+
// TODO: This is only optional because, for some reason, TCP returns
70+
// this value, but HTTP does not. We should fix this
71+
"host": mapval.Optional(mapval.IsEqual(host)),
72+
"duration.us": mapval.IsDuration,
73+
"id": id,
74+
"ip": ip,
75+
"scheme": scheme,
76+
"status": status,
77+
},
78+
})
79+
}
80+
81+
// TCPChecks creates a skima.Validator that represents the "tcp" field present
82+
// in all heartbeat events that use a Tcp connection as part of their DialChain
83+
func TCPChecks(port uint16) mapval.Validator {
84+
return mapval.Schema(mapval.Map{
85+
"tcp": mapval.Map{
86+
"port": port,
87+
"rtt.connect.us": mapval.IsDuration,
88+
},
89+
})
90+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package http
19+
20+
import (
21+
"net/http"
22+
"net/http/httptest"
23+
"testing"
24+
25+
"github.com/stretchr/testify/require"
26+
27+
"github.com/elastic/beats/heartbeat/hbtest"
28+
"github.com/elastic/beats/heartbeat/monitors"
29+
"github.com/elastic/beats/libbeat/beat"
30+
"github.com/elastic/beats/libbeat/common"
31+
"github.com/elastic/beats/libbeat/common/mapval"
32+
"github.com/elastic/beats/libbeat/testing/mapvaltest"
33+
)
34+
35+
func checkServer(t *testing.T, handlerFunc http.HandlerFunc) (*httptest.Server, beat.Event) {
36+
server := httptest.NewServer(handlerFunc)
37+
defer server.Close()
38+
39+
config := common.NewConfig()
40+
config.SetString("urls", 0, server.URL)
41+
42+
jobs, err := create(monitors.Info{}, config)
43+
require.NoError(t, err)
44+
45+
job := jobs[0]
46+
47+
event, _, err := job.Run()
48+
require.NoError(t, err)
49+
50+
return server, event
51+
}
52+
53+
func httpChecks(urlStr string, statusCode int) mapval.Validator {
54+
return mapval.Schema(mapval.Map{
55+
"http": mapval.Map{
56+
"url": urlStr,
57+
"response.status_code": statusCode,
58+
"rtt.content.us": mapval.IsDuration,
59+
"rtt.response_header.us": mapval.IsDuration,
60+
"rtt.total.us": mapval.IsDuration,
61+
"rtt.validate.us": mapval.IsDuration,
62+
"rtt.write_request.us": mapval.IsDuration,
63+
},
64+
})
65+
}
66+
67+
func badGatewayChecks() mapval.Validator {
68+
return mapval.Schema(mapval.Map{
69+
"error": mapval.Map{
70+
"message": "502 Bad Gateway",
71+
"type": "validate",
72+
},
73+
})
74+
}
75+
76+
func TestOKJob(t *testing.T) {
77+
server, event := checkServer(t, hbtest.HelloWorldHandler)
78+
port, err := hbtest.ServerPort(server)
79+
require.NoError(t, err)
80+
81+
mapvaltest.Test(
82+
t,
83+
mapval.Strict(mapval.Compose(
84+
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"),
85+
hbtest.TCPChecks(port),
86+
httpChecks(server.URL, http.StatusOK),
87+
))(event.Fields),
88+
)
89+
}
90+
91+
func TestBadGatewayJob(t *testing.T) {
92+
server, event := checkServer(t, hbtest.BadGatewayHandler)
93+
port, err := hbtest.ServerPort(server)
94+
require.NoError(t, err)
95+
96+
mapvaltest.Test(
97+
t,
98+
mapval.Strict(mapval.Compose(
99+
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "down"),
100+
hbtest.TCPChecks(port),
101+
httpChecks(server.URL, http.StatusBadGateway),
102+
badGatewayChecks(),
103+
))(event.Fields),
104+
)
105+
}

heartbeat/monitors/active/http/task.go

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -218,41 +218,65 @@ func execPing(
218218
body []byte,
219219
timeout time.Duration,
220220
validator func(*http.Response) error,
221-
) (time.Time, time.Time, common.MapStr, reason.Reason) {
221+
) (start, end time.Time, event common.MapStr, errReason reason.Reason) {
222222
ctx, cancel := context.WithTimeout(context.Background(), timeout)
223223
defer cancel()
224224

225-
req = req.WithContext(ctx)
225+
req = attachRequestBody(&ctx, req, body)
226+
start, end, resp, errReason := execRequest(client, req, validator)
227+
228+
if errReason != nil {
229+
if resp != nil {
230+
return start, end, makeEvent(end.Sub(start), resp), errReason
231+
}
232+
return start, end, nil, errReason
233+
}
234+
235+
event = makeEvent(end.Sub(start), resp)
236+
237+
return start, end, event, nil
238+
}
239+
240+
func attachRequestBody(ctx *context.Context, req *http.Request, body []byte) *http.Request {
241+
req = req.WithContext(*ctx)
226242
if len(body) > 0 {
227243
req.Body = ioutil.NopCloser(bytes.NewBuffer(body))
228244
req.ContentLength = int64(len(body))
229245
}
230246

231-
start := time.Now()
247+
return req
248+
}
249+
250+
func execRequest(client *http.Client, req *http.Request, validator func(*http.Response) error) (start time.Time, end time.Time, resp *http.Response, errReason reason.Reason) {
251+
start = time.Now()
232252
resp, err := client.Do(req)
233-
end := time.Now()
253+
if resp != nil { // If above errors, the response will be nil
254+
defer resp.Body.Close()
255+
}
256+
end = time.Now()
257+
234258
if err != nil {
235259
return start, end, nil, reason.IOFailed(err)
236260
}
237-
defer resp.Body.Close()
238261

239262
err = validator(resp)
240263
end = time.Now()
264+
if err != nil {
265+
return start, end, resp, reason.ValidateFailed(err)
266+
}
267+
268+
return start, end, resp, nil
269+
}
241270

242-
rtt := end.Sub(start)
243-
event := common.MapStr{"http": common.MapStr{
271+
func makeEvent(rtt time.Duration, resp *http.Response) common.MapStr {
272+
return common.MapStr{"http": common.MapStr{
244273
"response": common.MapStr{
245274
"status_code": resp.StatusCode,
246275
},
247276
"rtt": common.MapStr{
248277
"total": look.RTT(rtt),
249278
},
250279
}}
251-
252-
if err != nil {
253-
return start, end, event, reason.ValidateFailed(err)
254-
}
255-
return start, end, event, nil
256280
}
257281

258282
func splitHostnamePort(requ *http.Request) (string, uint16, error) {

0 commit comments

Comments
 (0)