Skip to content

Commit

Permalink
query-tee: add per-backend request headers (grafana#10081)
Browse files Browse the repository at this point in the history
* Add tests

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add headers to proxy

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add CHANGELOG.md entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Use YAML config instead

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add comment for why we allow choosing by index too

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Validate list of backends

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add util function for BackendConfig

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Fix validation

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov authored and bjorns163 committed Dec 30, 2024
1 parent 13b807a commit 7d7a9c1
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
### Query-tee

* [FEATURE] Added `-proxy.compare-skip-samples-before` to skip samples before the given time when comparing responses. The time can be in RFC3339 format (or) RFC3339 without the timezone and seconds (or) date only. #9515
* [FEATURE] Add `-backend.config-file` for a YAML configuration file for per-backend options. Currently, it only supports additional HTTP request headers. #10081
* [ENHANCEMENT] Added human-readable timestamps to comparison failure messages. #9665

### Documentation
Expand Down
72 changes: 71 additions & 1 deletion tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package querytee

import (
"encoding/json"
"flag"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strconv"
"strings"
"sync"
Expand All @@ -21,8 +23,10 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/server"
"github.com/grafana/dskit/spanlogger"
"github.com/grafana/regexp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v3"
)

type ProxyConfig struct {
Expand All @@ -34,6 +38,8 @@ type ProxyConfig struct {
BackendEndpoints string
PreferredBackend string
BackendReadTimeout time.Duration
BackendConfigFile string
parsedBackendConfig map[string]*BackendConfig
CompareResponses bool
LogSlowQueryResponseThreshold time.Duration
ValueComparisonTolerance float64
Expand All @@ -47,6 +53,23 @@ type ProxyConfig struct {
SecondaryBackendsRequestProportion float64
}

type BackendConfig struct {
RequestHeaders http.Header `json:"request_headers" yaml:"request_headers"`
}

func exampleJSONBackendConfig() string {
cfg := BackendConfig{
RequestHeaders: http.Header{
"Cache-Control": {"no-store"},
},
}
jsonBytes, err := json.Marshal(cfg)
if err != nil {
panic("invalid example backend config" + err.Error())
}
return string(jsonBytes)
}

func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.ServerHTTPServiceAddress, "server.http-service-address", "", "Bind address for server where query-tee service listens for HTTP requests.")
f.IntVar(&cfg.ServerHTTPServicePort, "server.http-service-port", 80, "The HTTP port where the query-tee service listens for HTTP requests.")
Expand All @@ -62,6 +85,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.BackendSkipTLSVerify, "backend.skip-tls-verify", false, "Skip TLS verification on backend targets.")
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.")
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 150*time.Second, "The timeout when reading the response from a backend.")
f.StringVar(&cfg.BackendConfigFile, "backend.config-file", "", "Path to a file with YAML or JSON configuration for each backend. Each key in the YAML/JSON document is a backend hostname. This is an example configuration value for a backend in JSON: "+exampleJSONBackendConfig())
f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.")
f.DurationVar(&cfg.LogSlowQueryResponseThreshold, "proxy.log-slow-query-response-threshold", 10*time.Second, "The minimum difference in response time between slowest and fastest back-end over which to log the query. 0 to disable.")
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
Expand Down Expand Up @@ -119,6 +143,17 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
return nil, errors.New("preferred backend must be set when secondary backends request proportion is not 1")
}

if len(cfg.BackendConfigFile) > 0 {
configBytes, err := os.ReadFile(cfg.BackendConfigFile)
if err != nil {
return nil, fmt.Errorf("failed to read backend config file (%s): %w", cfg.BackendConfigFile, err)
}
err = yaml.Unmarshal(configBytes, &cfg.parsedBackendConfig)
if err != nil {
return nil, fmt.Errorf("failed to parse backend YAML config: %w", err)
}
}

p := &Proxy{
cfg: cfg,
logger: logger,
Expand Down Expand Up @@ -153,14 +188,30 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
preferred = preferredIdx == idx
}

p.backends = append(p.backends, NewProxyBackend(name, u, cfg.BackendReadTimeout, preferred, cfg.BackendSkipTLSVerify))
backendCfg := cfg.parsedBackendConfig[name]
if backendCfg == nil {
// In tests, we have the same hostname for all backends, so we also
// support a numeric preferred backend which is the index in the list
// of backends.
backendCfg = cfg.parsedBackendConfig[strconv.Itoa(idx)]
if backendCfg == nil {
backendCfg = &BackendConfig{}
}
}

p.backends = append(p.backends, NewProxyBackend(name, u, cfg.BackendReadTimeout, preferred, cfg.BackendSkipTLSVerify, *backendCfg))
}

// At least 1 backend is required
if len(p.backends) < 1 {
return nil, errors.New("at least 1 backend is required")
}

err := validateBackendConfig(p.backends, cfg.parsedBackendConfig)
if err != nil {
return nil, fmt.Errorf("validating external backend configs: %w", err)
}

// If the preferred backend is configured, then it must exist among the actual backends.
if cfg.PreferredBackend != "" {
exists := false
Expand Down Expand Up @@ -188,6 +239,25 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
return p, nil
}

func validateBackendConfig(backends []ProxyBackendInterface, config map[string]*BackendConfig) error {
// Tests need to pass the same hostname for all backends, so we also
// support a numeric preferred backend which is the index in the list of backend.
numericBackendNameRegex := regexp.MustCompile("^[0-9]+$")
for configuredBackend := range config {
backendExists := false
for _, actualBacked := range backends {
if actualBacked.Name() == configuredBackend {
backendExists = true
break
}
}
if !backendExists && !numericBackendNameRegex.MatchString(configuredBackend) {
return fmt.Errorf("configured backend %s does not exist in the list of actual backends", configuredBackend)
}
}
return nil
}

func (p *Proxy) Start() error {
// Setup server first, so we can fail early if the ports are in use.
serv, err := server.New(server.Config{
Expand Down
10 changes: 9 additions & 1 deletion tools/querytee/proxy_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ type ProxyBackend struct {
// Whether this is the preferred backend from which picking up
// the response and sending it back to the client.
preferred bool
cfg BackendConfig
}

// NewProxyBackend makes a new ProxyBackend
func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool, skipTLSVerify bool) ProxyBackendInterface {
func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool, skipTLSVerify bool, cfg BackendConfig) ProxyBackendInterface {
innerTransport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
Expand All @@ -63,6 +64,7 @@ func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, pref
endpoint: endpoint,
timeout: timeout,
preferred: preferred,
cfg: cfg,
client: &http.Client{
CheckRedirect: func(_ *http.Request, _ []*http.Request) error {
return errors.New("the query-tee proxy does not follow redirects")
Expand Down Expand Up @@ -139,6 +141,12 @@ func (b *ProxyBackend) createBackendRequest(ctx context.Context, orig *http.Requ
// Remove Accept-Encoding header to avoid sending compressed responses
req.Header.Del("Accept-Encoding")

for headerName, headerValues := range b.cfg.RequestHeaders {
for _, headerValue := range headerValues {
req.Header.Add(headerName, headerValue)
}
}

return req, nil
}

Expand Down
6 changes: 5 additions & 1 deletion tools/querytee/proxy_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func Test_ProxyBackend_createBackendRequest_HTTPBasicAuthentication(t *testing.T
orig.Header.Set("X-Scope-OrgID", testData.clientTenant)
}

b := NewProxyBackend("test", u, time.Second, false, false)
b := NewProxyBackend("test", u, time.Second, false, false, defaultBackendConfig())
bp, ok := b.(*ProxyBackend)
if !ok {
t.Fatalf("Type assertion to *ProxyBackend failed")
Expand All @@ -98,3 +98,7 @@ func Test_ProxyBackend_createBackendRequest_HTTPBasicAuthentication(t *testing.T
})
}
}

func defaultBackendConfig() BackendConfig {
return BackendConfig{}
}
14 changes: 7 additions & 7 deletions tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) {
backendURL3, err := url.Parse("http://backend-3/")
require.NoError(t, err)

backendPref := NewProxyBackend("backend-1", backendURL1, time.Second, true, false)
backendOther1 := NewProxyBackend("backend-2", backendURL2, time.Second, false, false)
backendOther2 := NewProxyBackend("backend-3", backendURL3, time.Second, false, false)
backendPref := NewProxyBackend("backend-1", backendURL1, time.Second, true, false, defaultBackendConfig())
backendOther1 := NewProxyBackend("backend-2", backendURL2, time.Second, false, false, defaultBackendConfig())
backendOther2 := NewProxyBackend("backend-3", backendURL3, time.Second, false, false, defaultBackendConfig())

tests := map[string]struct {
backends []ProxyBackendInterface
Expand Down Expand Up @@ -157,8 +157,8 @@ func Test_ProxyEndpoint_Requests(t *testing.T) {
require.NoError(t, err)

backends := []ProxyBackendInterface{
NewProxyBackend("backend-1", backendURL1, time.Second, true, false),
NewProxyBackend("backend-2", backendURL2, time.Second, false, false),
NewProxyBackend("backend-1", backendURL1, time.Second, true, false, defaultBackendConfig()),
NewProxyBackend("backend-2", backendURL2, time.Second, false, false, defaultBackendConfig()),
}
endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(nil), log.NewNopLogger(), nil, 0, 1.0)

Expand Down Expand Up @@ -325,8 +325,8 @@ func Test_ProxyEndpoint_Comparison(t *testing.T) {
require.NoError(t, err)

backends := []ProxyBackendInterface{
NewProxyBackend("preferred-backend", preferredBackendURL, time.Second, true, false),
NewProxyBackend("secondary-backend", secondaryBackendURL, time.Second, false, false),
NewProxyBackend("preferred-backend", preferredBackendURL, time.Second, true, false, defaultBackendConfig()),
NewProxyBackend("secondary-backend", secondaryBackendURL, time.Second, false, false, defaultBackendConfig()),
}

logger := newMockLogger()
Expand Down
Loading

0 comments on commit 7d7a9c1

Please sign in to comment.