Skip to content

Introduce query-tee tool #2203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cmd/test-exporter/test-exporter
cmd/cortex/cortex
cmd/query-tee/query-tee
.uptodate
.pkg
.cache
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* [FEATURE] Allow Prometheus remote write directly to ingesters. #1491
* [FEATURE] Add flag `-experimental.tsdb.stripe-size` to expose TSDB stripe size option. #2185
* [FEATURE] Experimental Delete Series: Added support for Deleting Series with Prometheus style API. Needs to be enabled first by setting `--purger.enable` to `true`. Deletion only supported when using `boltdb` and `filesystem` as index and object store respectively. Support for other stores to follow in separate PRs #2103
* [FEATURE] Introduced new standalone service `query-tee` that can be used for testing purposes to send the same Prometheus query to multiple backends (ie. two Cortex clusters ingesting the same metrics) and compare the performances. #2203
* [ENHANCEMENT] Alertmanager: Expose Per-tenant alertmanager metrics #2124
* [ENHANCEMENT] Add `status` label to `cortex_alertmanager_configs` metric to gauge the number of valid and invalid configs. #2125
* [ENHANCEMENT] Cassandra Authentication: added the `custom_authenticators` config option that allows users to authenticate with cassandra clusters using password authenticators that are not approved by default in [gocql](https://github.com/gocql/gocql/blob/81b8263d9fe526782a588ef94d3fa5c6148e5d67/conn.go#L27) #2093
Expand Down
9 changes: 9 additions & 0 deletions cmd/query-tee/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM alpine:3.11
RUN apk add --no-cache ca-certificates
COPY query-tee /
ENTRYPOINT ["/query-tee"]

ARG revision
LABEL org.opencontainers.image.title="query-tee" \
org.opencontainers.image.source="https://github.com/cortexproject/cortex/tree/master/tools/query-tee" \
org.opencontainers.image.revision="${revision}"
60 changes: 60 additions & 0 deletions cmd/query-tee/instrumentation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"fmt"
"net"
"net/http"

"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/cortexproject/cortex/pkg/util"
)

type InstrumentationServer struct {
port int
registry *prometheus.Registry
srv *http.Server
}

// NewInstrumentationServer returns a server exposing Prometheus metrics.
func NewInstrumentationServer(port int, registry *prometheus.Registry) *InstrumentationServer {
return &InstrumentationServer{
port: port,
registry: registry,
}
}

// Start the instrumentation server.
func (s *InstrumentationServer) Start() error {
// Setup listener first, so we can fail early if the port is in use.
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
if err != nil {
return err
}

router := mux.NewRouter()
router.Handle("/metrics", promhttp.HandlerFor(s.registry, promhttp.HandlerOpts{}))

s.srv = &http.Server{
Handler: router,
}

go func() {
if err := s.srv.Serve(listener); err != nil {
level.Error(util.Logger).Log("msg", "metrics server terminated", "err", err)
}
}()

return nil
}

// Stop closes the instrumentation server.
func (s *InstrumentationServer) Stop() {
if s.srv != nil {
s.srv.Close()
s.srv = nil
}
}
63 changes: 63 additions & 0 deletions cmd/query-tee/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"flag"
"os"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/logging"
"github.com/weaveworks/common/server"

"github.com/cortexproject/cortex/pkg/util"
)

type Config struct {
ServerServicePort int
ServerMetricsPort int
BackendEndpoints string
PreferredBackend string
BackendReadTimeout time.Duration
LogLevel logging.Level
}

func main() {
// Parse CLI flags.
cfg := Config{}
flag.IntVar(&cfg.ServerServicePort, "server.service-port", 80, "The port where the query-tee service listens to.")
flag.IntVar(&cfg.ServerMetricsPort, "server.metrics-port", 9900, "The port where metrics are exposed.")
flag.StringVar(&cfg.BackendEndpoints, "backend.endpoints", "", "Comma separated list of backend endpoints to query.")
flag.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client.")
flag.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.")
cfg.LogLevel.RegisterFlags(flag.CommandLine)
flag.Parse()

util.InitLogger(&server.Config{
LogLevel: cfg.LogLevel,
})

// Run the instrumentation server.
registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewGoCollector())

i := NewInstrumentationServer(cfg.ServerMetricsPort, registry)
if err := i.Start(); err != nil {
level.Error(util.Logger).Log("msg", "Unable to start instrumentation server", "err", err.Error())
os.Exit(1)
}

// Run the proxy.
proxy, err := NewProxy(cfg, util.Logger, registry)
if err != nil {
level.Error(util.Logger).Log("msg", "Unable to initialize the proxy", "err", err.Error())
os.Exit(1)
}

if err := proxy.Start(); err != nil {
level.Error(util.Logger).Log("msg", "Unable to start the proxy", "err", err.Error())
os.Exit(1)
}

proxy.Await()
}
142 changes: 142 additions & 0 deletions cmd/query-tee/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

var (
errMinBackends = errors.New("at least 1 backend is required")
)

type Proxy struct {
cfg Config
backends []*ProxyBackend
logger log.Logger
metrics *ProxyMetrics

// The HTTP server used to run the proxy service.
srv *http.Server
srvListener net.Listener

// Wait group used to wait until the server has done.
done sync.WaitGroup
}

func NewProxy(cfg Config, logger log.Logger, registerer prometheus.Registerer) (*Proxy, error) {
p := &Proxy{
cfg: cfg,
logger: logger,
metrics: NewProxyMetrics(registerer),
}

// Parse the backend endpoints (comma separated).
parts := strings.Split(cfg.BackendEndpoints, ",")

for idx, part := range parts {
// Skip empty ones.
part = strings.TrimSpace(part)
if part == "" {
continue
}

u, err := url.Parse(part)
if err != nil {
return nil, errors.Wrapf(err, "invalid backend endpoint %s", part)
}

// The backend name is hardcoded as the backend hostname.
name := u.Hostname()
preferred := name == cfg.PreferredBackend

// In tests we have the same hostname for all backends, so we also
// support a numeric preferred backend which is the index in the list
// of backends.
if preferredIdx, err := strconv.Atoi(cfg.PreferredBackend); err == nil {
preferred = preferredIdx == idx
}

p.backends = append(p.backends, NewProxyBackend(name, u, cfg.BackendReadTimeout, preferred))
}

// At least 1 backend is required
if len(p.backends) < 1 {
return nil, errMinBackends
}

// At least 2 backends are suggested
if len(p.backends) < 2 {
level.Warn(p.logger).Log("msg", "The proxy is running with only 1 backend. At least 2 backends are required to fulfil the purpose of the proxy and compare results.")
}

return p, nil
}

func (p *Proxy) Start() error {
// Setup listener first, so we can fail early if the port is in use.
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", p.cfg.ServerServicePort))
if err != nil {
return err
}

// Read endpoints.
router := mux.NewRouter()
router.Path("/api/v1/query").Methods("GET").Handler(NewProxyEndpoint(p.backends, "api_v1_query", p.metrics, p.logger))
router.Path("/api/v1/query_range").Methods("GET").Handler(NewProxyEndpoint(p.backends, "api_v1_query_range", p.metrics, p.logger))
router.Path("/api/v1/labels").Methods("GET").Handler(NewProxyEndpoint(p.backends, "api_v1_labels", p.metrics, p.logger))
router.Path("/api/v1/label/{name}/values").Methods("GET").Handler(NewProxyEndpoint(p.backends, "api_v1_label_name_values", p.metrics, p.logger))
router.Path("/api/v1/series").Methods("GET").Handler(NewProxyEndpoint(p.backends, "api_v1_series", p.metrics, p.logger))

p.srvListener = listener
p.srv = &http.Server{
ReadTimeout: 1 * time.Minute,
WriteTimeout: 2 * time.Minute,
Handler: router,
}

// Run in a dedicated goroutine.
p.done.Add(1)
go func() {
defer p.done.Done()

if err := p.srv.Serve(p.srvListener); err != nil {
level.Error(p.logger).Log("msg", "Proxy server failed", "err", err)
}
}()

return nil
}

func (p *Proxy) Stop() error {
if p.srv == nil {
return nil
}

return p.srv.Shutdown(context.Background())
}

func (p *Proxy) Await() {
// Wait until terminated.
p.done.Wait()
}

func (p *Proxy) Endpoint() string {
if p.srvListener == nil {
return ""
}

return p.srvListener.Addr().String()
}
Loading