Skip to content

Commit

Permalink
Move http_endpoint input to v2 input API (elastic#19815)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Jul 14, 2020
1 parent 72cc711 commit 542a4af
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 306 deletions.
1 change: 0 additions & 1 deletion x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion x-pack/filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry"
"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
)

Expand All @@ -23,7 +24,8 @@ func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin

func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin {
return []v2.Plugin{
o365audit.Plugin(log, store),
cloudfoundry.Plugin(),
http_endpoint.Plugin(),
o365audit.Plugin(log, store),
}
}
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,11 @@ func (c *config) Validate() error {
return errors.New("response_body must be valid JSON")
}

if c.BasicAuth {
if c.Username == "" || c.Password == "" {
return errors.New("Username and password required when basicauth is enabled")
}
}

return nil
}
109 changes: 109 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package http_endpoint

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"

stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

type httpHandler struct {
log *logp.Logger
publisher stateless.Publisher

messageField string
responseCode int
responseBody string
}

var errBodyEmpty = errors.New("Body cannot be empty")
var errUnsupportedType = errors.New("Only JSON objects are accepted")

// Triggers if middleware validation returns successful
func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
obj, status, err := httpReadJsonObject(r.Body)
if err != nil {
w.Header().Add("Content-Type", "application/json")
sendErrorResponse(w, status, err)
return
}

h.publishEvent(obj)
w.Header().Add("Content-Type", "application/json")
h.sendResponse(w, h.responseCode, h.responseBody)
}

func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message string) {
w.WriteHeader(status)
io.WriteString(w, message)
}

func (h *httpHandler) publishEvent(obj common.MapStr) {
event := beat.Event{
Timestamp: time.Now().UTC(),
Fields: common.MapStr{
h.messageField: obj,
},
}

h.publisher.Publish(event)
}

func withValidator(v validator, handler http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if status, err := v.ValidateHeader(r); status != 0 && err != nil {
sendErrorResponse(w, status, err)
} else {
handler(w, r)
}
}
}

func sendErrorResponse(w http.ResponseWriter, status int, err error) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)
fmt.Fprintf(w, `{"message": %q}`, err.Error())
}

func httpReadJsonObject(body io.Reader) (obj common.MapStr, status int, err error) {
if body == http.NoBody {
return nil, http.StatusNotAcceptable, errBodyEmpty
}

contents, err := ioutil.ReadAll(body)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("failed reading body: %w", err)
}

if !isObject(contents) {
return nil, http.StatusBadRequest, errUnsupportedType
}

obj = common.MapStr{}
if err := json.Unmarshal(contents, &obj); err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("Malformed JSON body: %w", err)
}

return obj, 0, nil
}

func isObject(b []byte) bool {
obj := bytes.TrimLeft(b, " \t\r\n")
if len(obj) > 0 && obj[0] == '{' {
return true
}
return false
}
77 changes: 0 additions & 77 deletions x-pack/filebeat/input/http_endpoint/httpserver.go

This file was deleted.

Loading

0 comments on commit 542a4af

Please sign in to comment.