Skip to content

Commit

Permalink
Use a bufio.Scanner in http listener
Browse files Browse the repository at this point in the history
this will prevent potential very large allocations due to a very large
chunk size send from a client.

fixes #1823
  • Loading branch information
sparrc committed Sep 29, 2016
1 parent ca8e512 commit 78ced6b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
- [#1137](https://github.com/influxdata/telegraf/issues/1137): Fix issue loading config directory on windows.
- [#1772](https://github.com/influxdata/telegraf/pull/1772): Windows remote management interactive service fix.
- [#1702](https://github.com/influxdata/telegraf/issues/1702): sqlserver, fix issue when case sensitive collation is activated.
- [#1823](https://github.com/influxdata/telegraf/issues/1823): Fix huge allocations in http_listener when dealing with huge payloads.

## v1.0.1 [unreleased]

Expand Down
41 changes: 26 additions & 15 deletions plugins/inputs/http_listener/http_listener.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package http_listener

import (
"io/ioutil"
"bufio"
"bytes"
"fmt"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -111,25 +113,34 @@ func (t *HttpListener) httpListen() error {
func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()
body, err := ioutil.ReadAll(req.Body)
if err != nil {
log.Printf("Problem reading request: [%s], Error: %s\n", string(body), err)
http.Error(res, "ERROR reading request", http.StatusInternalServerError)
return
}

switch req.URL.Path {
case "/write":
var metrics []telegraf.Metric
metrics, err = t.parser.Parse(body)
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
var http400msg bytes.Buffer
var partial string
scanner := bufio.NewScanner(req.Body)
scanner.Buffer([]byte(""), 128*1024)
for scanner.Scan() {
metrics, err := t.parser.Parse(scanner.Bytes())
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
partial = "partial write: "
} else {
http400msg.WriteString(err.Error() + " ")
}
res.WriteHeader(http.StatusNoContent)
}

if err := scanner.Err(); err != nil {
http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError)
} else if http400msg.Len() > 0 {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String())))
} else {
log.Printf("Problem parsing body: [%s], Error: %s\n", string(body), err)
http.Error(res, "ERROR parsing metrics", http.StatusInternalServerError)
res.WriteHeader(http.StatusNoContent)
}
case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB
Expand Down
16 changes: 15 additions & 1 deletion plugins/inputs/http_listener/http_listener_test.go

Large diffs are not rendered by default.

0 comments on commit 78ced6b

Please sign in to comment.