Skip to content

Commit

Permalink
Use io.Pipe() to minimize internal buffering
Browse files Browse the repository at this point in the history
Fix lint

Mop
  • Loading branch information
Bobby Shannon committed Jul 15, 2017
1 parent 8abde2a commit 9e62fc3
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
27 changes: 13 additions & 14 deletions plugins/outputs/influxdb/client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,10 @@ func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, err
if c.config.Gzip {
// If gzip is set to true, then compress
// the payload.
buf := new(bytes.Buffer)
buf.ReadFrom(body)
compressed, err := compressWithGzip(buf.Bytes())
body, err = compressWithGzip(body)
if err != nil {
return nil, err
}
body = bytes.NewBuffer(compressed)
}
req, err = http.NewRequest("POST", uri, body)
if err != nil {
Expand All @@ -265,16 +262,18 @@ func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, err
return req, nil
}

func compressWithGzip(data []byte) ([]byte, error) {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := gz.Write(data); err != nil {
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
func compressWithGzip(data io.Reader) (io.Reader, error) {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
var err error

go func() {
_, err = io.Copy(gw, data)
gw.Close()
pw.Close()
}()

return pr, err
}

func (c *httpClient) Close() error {
Expand Down
14 changes: 8 additions & 6 deletions plugins/outputs/influxdb/client/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,20 +344,22 @@ func TestHTTPClient_Query_JSONDecodeError(t *testing.T) {
}

func TestGzipCompression(t *testing.T) {
influxLine := "cpu value=99\n"

// Compress the payload using GZIP.
payload := []byte("cpu value=99\n")
payload := bytes.NewReader([]byte(influxLine))
compressed, err := compressWithGzip(payload)
assert.Nil(t, err)

// GUNZIP the compressed payload and make sure
// Decompress the compressed payload and make sure
// that its original value has not changed.
r, err := gzip.NewReader(bytes.NewReader(compressed))
gr, err := gzip.NewReader(compressed)
assert.Nil(t, err)
r.Close()
gr.Close()

var uncompressed bytes.Buffer
_, err = uncompressed.ReadFrom(r)
_, err = uncompressed.ReadFrom(gr)
assert.Nil(t, err)

assert.Equal(t, payload, uncompressed.Bytes())
assert.Equal(t, []byte(influxLine), uncompressed.Bytes())
}

0 comments on commit 9e62fc3

Please sign in to comment.