Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: add debug URL logging
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed May 9, 2024
1 parent 7719bb6 commit 6ec5edd
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ type handler struct {
}

func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
txID := h.nextTxID()
h.log.Debugw("request", "url", r.URL, "tx_id", txID)
status, err := h.validator.validateRequest(r)
if err != nil {
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.sendAPIErrorResponse(txID, w, r, h.log, status, err)
return
}

Expand All @@ -94,7 +96,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.metrics.contentLength.Update(r.ContentLength)
body, status, err := getBodyReader(r)
if err != nil {
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.sendAPIErrorResponse(txID, w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}
Expand All @@ -113,7 +115,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

objs, _, status, err := httpReadJSON(body, h.program)
if err != nil {
h.sendAPIErrorResponse(w, r, h.log, status, err)
h.sendAPIErrorResponse(txID, w, r, h.log, status, err)
h.metrics.apiErrors.Add(1)
return
}
Expand All @@ -138,15 +140,15 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
break
} else if !errors.Is(err, errNotCRC) {
h.metrics.apiErrors.Add(1)
h.sendAPIErrorResponse(w, r, h.log, http.StatusBadRequest, err)
h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusBadRequest, err)
return
}
}

acker.Add()
if err = h.publishEvent(obj, headers, acker); err != nil {
h.metrics.apiErrors.Add(1)
h.sendAPIErrorResponse(w, r, h.log, http.StatusInternalServerError, err)
h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusInternalServerError, err)
return
}
h.metrics.eventsPublished.Add(1)
Expand All @@ -159,17 +161,20 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} else {
select {
case <-acked:
h.log.Debugw("request acked", "tx_id", txID)
if !timeout.Stop() {
<-timeout.C
}
h.sendResponse(w, respCode, respBody)
case <-timeout.C:
h.sendAPIErrorResponse(w, r, h.log, http.StatusGatewayTimeout, errTookTooLong)
h.log.Debugw("request timed out", "tx_id", txID)
h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, errTookTooLong)
case <-h.ctx.Done():
h.sendAPIErrorResponse(w, r, h.log, http.StatusGatewayTimeout, h.ctx.Err())
h.log.Debugw("request context cancelled", "tx_id", txID)
h.sendAPIErrorResponse(txID, w, r, h.log, http.StatusGatewayTimeout, h.ctx.Err())
}
if h.reqLogger != nil {
h.logRequest(r, respCode, nil)
h.logRequest(txID, r, respCode, nil)
}
}
h.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
Expand All @@ -196,7 +201,7 @@ func getTimeoutWait(u *url.URL, log *logp.Logger) time.Duration {
return t
}

func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
func (h *handler) sendAPIErrorResponse(txID string, w http.ResponseWriter, r *http.Request, log *logp.Logger, status int, apiError error) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(status)

Expand All @@ -214,11 +219,11 @@ func (h *handler) sendAPIErrorResponse(w http.ResponseWriter, r *http.Request, l
log.Debugw("Failed to write HTTP response.", "error", err, "client.address", r.RemoteAddr)
}
if h.reqLogger != nil {
h.logRequest(r, status, buf.Bytes())
h.logRequest(txID, r, status, buf.Bytes())
}
}

func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
func (h *handler) logRequest(txID string, r *http.Request, status int, respBody []byte) {
// Populate and preserve scheme and host if they are missing;
// they are required for httputil.DumpRequestOut.
var scheme, host string
Expand All @@ -244,7 +249,6 @@ func (h *handler) logRequest(r *http.Request, status int, respBody []byte) {
zap.ByteString("http.response.body.content", respBody),
)
}
txID := h.nextTxID()
h.log.Debugw("new request trace transaction", "id", txID)
// Limit request logging body size to 10kiB.
const maxBodyLen = 10 * (1 << 10)
Expand Down

0 comments on commit 6ec5edd

Please sign in to comment.