diff --git a/CHANGELOG.md b/CHANGELOG.md index 9dcf2fa7a54..ec05211ea90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ v1.7.0 [unreleased] - [#9964](https://github.com/influxdata/influxdb/pull/9964): Enable the storage service by default. - [#9996](https://github.com/influxdata/influxdb/pull/9996): Ensure read service regexes get optimised. - [#10408](https://github.com/influxdata/influxdb/pull/10408): Add Flux support to the influx CLI command. +- [#10257](https://github.com/influxdata/influxdb/issues/10257): Add chunked query into the Go client v2. ### Bugfixes diff --git a/client/v2/client.go b/client/v2/client.go index 6a5c238bcdc..b16da2cf4a7 100644 --- a/client/v2/client.go +++ b/client/v2/client.go @@ -78,6 +78,10 @@ type Client interface { // the UDP client. Query(q Query) (*Response, error) + // QueryAsChunk makes an InfluxDB Query on the database. This will fail if using + // the UDP client. + QueryAsChunk(q Query) (*ChunkedResponse, error) + // Close releases any resources a Client may be using. Close() error } @@ -496,75 +500,26 @@ type Result struct { // Query sends a command to the server and returns the Response. func (c *client) Query(q Query) (*Response, error) { - u := c.url - u.Path = path.Join(u.Path, "query") - - jsonParameters, err := json.Marshal(q.Parameters) - + req, err := c.createDefaultRequest(q) if err != nil { return nil, err } - - req, err := http.NewRequest("POST", u.String(), nil) - if err != nil { - return nil, err - } - - req.Header.Set("Content-Type", "") - req.Header.Set("User-Agent", c.useragent) - - if c.username != "" { - req.SetBasicAuth(c.username, c.password) - } - params := req.URL.Query() - params.Set("q", q.Command) - params.Set("db", q.Database) - if q.RetentionPolicy != "" { - params.Set("rp", q.RetentionPolicy) - } - params.Set("params", string(jsonParameters)) if q.Chunked { params.Set("chunked", "true") if q.ChunkSize > 0 { params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) } + req.URL.RawQuery = params.Encode() } - - if q.Precision != "" { - params.Set("epoch", q.Precision) - } - req.URL.RawQuery = params.Encode() - resp, err := c.httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() - // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb - // but instead some other service. If the error code is also a 500+ code, then some - // downstream loadbalancer/proxy/etc had an issue and we should report that. - if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError { - body, err := ioutil.ReadAll(resp.Body) - if err != nil || len(body) == 0 { - return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode) - } - - return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body) - } - - // If we get an unexpected content type, then it is also not from influx direct and therefore - // we want to know what we received and what status code was returned for debugging purposes. - if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" { - // Read up to 1kb of the body to help identify downstream errors and limit the impact of things - // like downstream serving a large file - body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) - if err != nil || len(body) == 0 { - return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode) - } - - return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body) + if err := checkResponse(resp); err != nil { + return nil, err } var response Response @@ -573,6 +528,9 @@ func (c *client) Query(q Query) (*Response, error) { for { r, err := cr.NextResponse() if err != nil { + if err == io.EOF { + break + } // If we got an error while decoding the response, send that back. return nil, err } @@ -610,10 +568,99 @@ func (c *client) Query(q Query) (*Response, error) { return &response, nil } +// QueryAsChunk sends a command to the server and returns the Response. +func (c *client) QueryAsChunk(q Query) (*ChunkedResponse, error) { + req, err := c.createDefaultRequest(q) + if err != nil { + return nil, err + } + params := req.URL.Query() + params.Set("chunked", "true") + if q.ChunkSize > 0 { + params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) + } + req.URL.RawQuery = params.Encode() + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + + if err := checkResponse(resp); err != nil { + return nil, err + } + return NewChunkedResponse(resp.Body), nil +} + +func checkResponse(resp *http.Response) error { + // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb + // but instead some other service. If the error code is also a 500+ code, then some + // downstream loadbalancer/proxy/etc had an issue and we should report that. + if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError { + body, err := ioutil.ReadAll(resp.Body) + if err != nil || len(body) == 0 { + return fmt.Errorf("received status code %d from downstream server", resp.StatusCode) + } + + return fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body) + } + + // If we get an unexpected content type, then it is also not from influx direct and therefore + // we want to know what we received and what status code was returned for debugging purposes. + if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" { + // Read up to 1kb of the body to help identify downstream errors and limit the impact of things + // like downstream serving a large file + body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) + if err != nil || len(body) == 0 { + return fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode) + } + + return fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body) + } + return nil +} + +func (c *client) createDefaultRequest(q Query) (*http.Request, error) { + u := c.url + u.Path = path.Join(u.Path, "query") + + jsonParameters, err := json.Marshal(q.Parameters) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", u.String(), nil) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.useragent) + + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + params := req.URL.Query() + params.Set("q", q.Command) + params.Set("db", q.Database) + if q.RetentionPolicy != "" { + params.Set("rp", q.RetentionPolicy) + } + params.Set("params", string(jsonParameters)) + + if q.Precision != "" { + params.Set("epoch", q.Precision) + } + req.URL.RawQuery = params.Encode() + + return req, nil + +} + // duplexReader reads responses and writes it to another writer while // satisfying the reader interface. type duplexReader struct { - r io.Reader + r io.ReadCloser w io.Writer } @@ -625,6 +672,11 @@ func (r *duplexReader) Read(p []byte) (n int, err error) { return n, err } +// Close closes the response. +func (r *duplexReader) Close() error { + return r.r.Close() +} + // ChunkedResponse represents a response from the server that // uses chunking to stream the output. type ChunkedResponse struct { @@ -635,8 +687,12 @@ type ChunkedResponse struct { // NewChunkedResponse reads a stream and produces responses from the stream. func NewChunkedResponse(r io.Reader) *ChunkedResponse { + rc, ok := r.(io.ReadCloser) + if !ok { + rc = ioutil.NopCloser(r) + } resp := &ChunkedResponse{} - resp.duplex = &duplexReader{r: r, w: &resp.buf} + resp.duplex = &duplexReader{r: rc, w: &resp.buf} resp.dec = json.NewDecoder(resp.duplex) resp.dec.UseNumber() return resp @@ -645,10 +701,9 @@ func NewChunkedResponse(r io.Reader) *ChunkedResponse { // NextResponse reads the next line of the stream and returns a response. func (r *ChunkedResponse) NextResponse() (*Response, error) { var response Response - if err := r.dec.Decode(&response); err != nil { if err == io.EOF { - return nil, nil + return nil, err } // A decoding error happened. This probably means the server crashed // and sent a last-ditch error message to us. Ensure we have read the @@ -660,3 +715,8 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) { r.buf.Reset() return &response, nil } + +// Close closes the response. +func (r *ChunkedResponse) Close() error { + return r.duplex.Close() +} diff --git a/client/v2/client_test.go b/client/v2/client_test.go index 27dd13b8b59..c76f530b79b 100644 --- a/client/v2/client_test.go +++ b/client/v2/client_test.go @@ -911,3 +911,29 @@ func TestClientProxy(t *testing.T) { t.Fatalf("no http request was received") } } + +func TestClient_QueryAsChunk(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var data Response + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Influxdb-Version", "1.3.1") + w.WriteHeader(http.StatusOK) + enc := json.NewEncoder(w) + _ = enc.Encode(data) + _ = enc.Encode(data) + })) + defer ts.Close() + + config := HTTPConfig{Addr: ts.URL} + c, err := NewHTTPClient(config) + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } + + query := Query{Chunked: true} + resp, err := c.QueryAsChunk(query) + defer resp.Close() + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } +} diff --git a/client/v2/udp.go b/client/v2/udp.go index 779a28b33f3..9867868b41c 100644 --- a/client/v2/udp.go +++ b/client/v2/udp.go @@ -107,6 +107,10 @@ func (uc *udpclient) Query(q Query) (*Response, error) { return nil, fmt.Errorf("Querying via UDP is not supported") } +func (uc *udpclient) QueryAsChunk(q Query) (*ChunkedResponse, error) { + return nil, fmt.Errorf("Querying via UDP is not supported") +} + func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) { return 0, "", nil }