Skip to content

Commit 0460dad

Browse files
authored
Allow the query-frontend code to proxy queries to 'vanilla' Prometheus. (#1441)
* Allow the query-frontend code to proxy queries to 'vanilla' Prometheus. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> * Various tweaks - Allow specification of target in config file. - Allow sepcification of HTTP prefix (/api/prom), so we can drop it to look like a 'normal' prometheus. - Fix forwarding request to normal Prometheus. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> * Add example config for using with vanilla prometheus. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> * Add newline at end of config file. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
1 parent cd11e20 commit 0460dad

File tree

4 files changed

+87
-5
lines changed

4 files changed

+87
-5
lines changed

docs/prometheus-frontend.yml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# You can use the Cortex query frontend with any Prometheus-API compatible
2+
# service, including Prometheus and Thanos. Use this config file to get
3+
# the benefits of query parallelisation and caching.
4+
5+
# Disable the requirement that every request to Cortex has a
6+
# X-Scope-OrgID header. `fake` will be substituted in instead.
7+
auth_enabled: false
8+
9+
# We only want to run the query-frontend module.
10+
target: query-frontend
11+
12+
# We don't want the usual /api/prom prefix.
13+
http_prefix:
14+
15+
server:
16+
http_listen_port: 9091
17+
18+
frontend:
19+
split_queries_by_day: true
20+
align_queries_with_step: true
21+
cache_results: true
22+
compress_responses: true
23+
24+
results_cache:
25+
max_freshness: 1m
26+
cache:
27+
28+
# We're going to use the in-process "FIFO" cache, but you can enable
29+
# memcached below.
30+
enable_fifocache: true
31+
fifocache:
32+
size: 1024
33+
validity: 24h
34+
35+
# If you want to use a memcached cluster, configure a headless service
36+
# in Kubernetes and Cortex will discover the individual instances using
37+
# a SRV DNS query. Cortex will then do client-side hashing to spread
38+
# the load evenly.
39+
# memcached:
40+
# memcached_client:
41+
# host: memcached.default.svc.cluster.local
42+
# service: memcached
43+
# consistent_hash: true

pkg/cortex/cortex.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type Config struct {
5353
Target moduleName `yaml:"target,omitempty"`
5454
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
5555
PrintConfig bool `yaml:"-"`
56+
HTTPPrefix string `yaml:"http_prefix"`
5657

5758
Server server.Config `yaml:"server,omitempty"`
5859
Distributor distributor.Config `yaml:"distributor,omitempty"`
@@ -82,6 +83,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
8283
f.Var(&c.Target, "target", "target module (default All)")
8384
f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.")
8485
f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.")
86+
f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.")
8587

8688
c.Server.RegisterFlags(f)
8789
c.Distributor.RegisterFlags(f)

pkg/cortex/modules.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,14 @@ func (m *moduleName) Set(s string) error {
132132
}
133133
}
134134

135+
func (m *moduleName) UnmarshalYAML(unmarshal func(interface{}) error) error {
136+
var s string
137+
if err := unmarshal(&s); err != nil {
138+
return err
139+
}
140+
return m.Set(s)
141+
}
142+
135143
func (t *Cortex) initServer(cfg *Config) (err error) {
136144
t.server, err = server.New(cfg.Server)
137145
return
@@ -259,7 +267,7 @@ func (t *Cortex) initQueryFrontend(cfg *Config) (err error) {
259267
}
260268

261269
frontend.RegisterFrontendServer(t.server.GRPC, t.frontend)
262-
t.server.HTTP.PathPrefix("/api/prom").Handler(
270+
t.server.HTTP.PathPrefix(cfg.HTTPPrefix).Handler(
263271
t.httpAuthMiddleware.Wrap(
264272
t.frontend.Handler(),
265273
),

pkg/querier/frontend/frontend.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"io/ioutil"
99
"math/rand"
1010
"net/http"
11+
"net/url"
12+
"path"
1113
"sync"
1214
"time"
1315

@@ -57,6 +59,7 @@ type Config struct {
5759
CacheResults bool `yaml:"cache_results"`
5860
CompressResponses bool `yaml:"compress_responses"`
5961
queryrange.ResultsCacheConfig `yaml:"results_cache"`
62+
DownstreamURL string `yaml:"downstream"`
6063
}
6164

6265
// RegisterFlags adds the flags required to config this to the given FlagSet.
@@ -68,6 +71,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6871
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
6972
f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
7073
cfg.ResultsCacheConfig.RegisterFlags(f)
74+
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
7175
}
7276

7377
// Frontend queues HTTP requests, dispatches them to backends, and handles retries
@@ -99,6 +103,7 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
99103
log: log,
100104
queues: map[string]chan *request{},
101105
}
106+
f.cond = sync.NewCond(&f.mtx)
102107

103108
// Stack up the pipeline of various query range middlewares.
104109
var queryRangeMiddleware []queryrange.Middleware
@@ -119,20 +124,44 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
119124
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("retry", queryRangeDuration), queryrange.NewRetryMiddleware(log, cfg.MaxRetries))
120125
}
121126

122-
// Finally, if the user selected any query range middleware, stitch it in.
127+
// If the user has specified a downstream Prometheus, then we should
128+
// forward requests to that. Otherwise we will wait for queries to
129+
// contact us.
123130
var roundTripper http.RoundTripper = f
131+
if cfg.DownstreamURL != "" {
132+
u, err := url.Parse(cfg.DownstreamURL)
133+
if err != nil {
134+
return nil, err
135+
}
136+
137+
roundTripper = RoundTripFunc(func(r *http.Request) (*http.Response, error) {
138+
r.URL.Scheme = u.Scheme
139+
r.URL.Host = u.Host
140+
r.URL.Path = path.Join(u.Path, r.URL.Path)
141+
return http.DefaultTransport.RoundTrip(r)
142+
})
143+
}
144+
145+
// Finally, if the user selected any query range middleware, stitch it in.
124146
if len(queryRangeMiddleware) > 0 {
125147
roundTripper = queryrange.NewRoundTripper(
126-
f,
127-
queryrange.MergeMiddlewares(queryRangeMiddleware...).Wrap(&queryrange.ToRoundTripperMiddleware{Next: f}),
148+
roundTripper,
149+
queryrange.MergeMiddlewares(queryRangeMiddleware...).Wrap(&queryrange.ToRoundTripperMiddleware{Next: roundTripper}),
128150
limits,
129151
)
130152
}
131153
f.roundTripper = roundTripper
132-
f.cond = sync.NewCond(&f.mtx)
133154
return f, nil
134155
}
135156

157+
// RoundTripFunc is to http.RoundTripper what http.HandlerFunc is to http.Handler.
158+
type RoundTripFunc func(*http.Request) (*http.Response, error)
159+
160+
// RoundTrip implements http.RoundTripper.
161+
func (f RoundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) {
162+
return f(r)
163+
}
164+
136165
// Close stops new requests and errors out any pending requests.
137166
func (f *Frontend) Close() {
138167
f.mtx.Lock()

0 commit comments

Comments
 (0)