Skip to content

Commit f807690

Browse files
committed
Basic retrying query frontend.
Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
1 parent 4ffec28 commit f807690

File tree

11 files changed

+1466
-63
lines changed

11 files changed

+1466
-63
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ cmd/configs/configs
33
cmd/distributor/distributor
44
cmd/ingester/ingester
55
cmd/querier/querier
6+
cmd/query-frontend/query-frontend
67
cmd/ruler/ruler
78
cmd/table-manager/table-manager
89
cmd/lite/lite
910
.uptodate
1011
.pkg
1112
.cache
1213
pkg/ingester/client/cortex.pb.go
14+
pkg/querier/frontend/frontend.pb.go
1315
pkg/ring/ring.pb.go
1416
images/

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ images:
3131
@echo > /dev/null
3232

3333
# Generating proto code is automated.
34-
PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print)
34+
PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) vendor/github.com/weaveworks/common/httpgrpc/httpgrpc.proto
3535
PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS))
3636

3737
# Building binaries is now automated. The convention is to build a binary

cmd/querier/main.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import (
1414
"github.com/prometheus/prometheus/web/api/v1"
1515
"github.com/prometheus/tsdb"
1616

17+
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
1718
"github.com/weaveworks/common/middleware"
1819
"github.com/weaveworks/common/server"
1920
"github.com/weaveworks/common/tracing"
2021
"github.com/weaveworks/cortex/pkg/chunk"
2122
"github.com/weaveworks/cortex/pkg/chunk/storage"
2223
"github.com/weaveworks/cortex/pkg/distributor"
2324
"github.com/weaveworks/cortex/pkg/querier"
25+
"github.com/weaveworks/cortex/pkg/querier/frontend"
2426
"github.com/weaveworks/cortex/pkg/ring"
2527
"github.com/weaveworks/cortex/pkg/util"
2628
)
@@ -39,9 +41,10 @@ func main() {
3941
chunkStoreConfig chunk.StoreConfig
4042
schemaConfig chunk.SchemaConfig
4143
storageConfig storage.Config
44+
workerConfig frontend.WorkerConfig
4245
)
4346
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &querierConfig,
44-
&chunkStoreConfig, &schemaConfig, &storageConfig)
47+
&chunkStoreConfig, &schemaConfig, &storageConfig, &workerConfig)
4548
flag.Parse()
4649

4750
// Setting the environment variable JAEGER_AGENT_HOST enables tracing
@@ -86,6 +89,13 @@ func main() {
8689
}
8790
defer chunkStore.Stop()
8891

92+
worker, err := frontend.NewWorker(workerConfig, httpgrpc_server.NewServer(server.HTTP), util.Logger)
93+
if err != nil {
94+
level.Error(util.Logger).Log("err", err)
95+
os.Exit(1)
96+
}
97+
defer worker.Stop()
98+
8999
queryable, engine := querier.Make(querierConfig, dist, chunkStore)
90100
api := v1.NewAPI(
91101
engine,

cmd/query-frontend/Dockerfile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
FROM alpine:3.8
2+
RUN apk add --no-cache ca-certificates
3+
COPY query-frontend /bin/query-frontend
4+
EXPOSE 80
5+
ENTRYPOINT [ "/bin/query-frontend" ]
6+
7+
ARG revision
8+
LABEL org.opencontainers.image.title="query-frontend" \
9+
org.opencontainers.image.source="https://github.com/weaveworks/cortex/tree/master/cmd/query-frontend" \
10+
org.opencontainers.image.revision="${revision}"

cmd/query-frontend/main.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"os"
6+
7+
"github.com/go-kit/kit/log/level"
8+
"google.golang.org/grpc"
9+
10+
"github.com/weaveworks/common/middleware"
11+
"github.com/weaveworks/common/server"
12+
"github.com/weaveworks/common/tracing"
13+
"github.com/weaveworks/cortex/pkg/querier/frontend"
14+
"github.com/weaveworks/cortex/pkg/util"
15+
)
16+
17+
func main() {
18+
var (
19+
serverConfig = server.Config{
20+
MetricsNamespace: "cortex",
21+
GRPCMiddleware: []grpc.UnaryServerInterceptor{
22+
middleware.ServerUserHeaderInterceptor,
23+
},
24+
}
25+
frontendConfig frontend.Config
26+
)
27+
util.RegisterFlags(&serverConfig, &frontendConfig)
28+
flag.Parse()
29+
30+
// Setting the environment variable JAEGER_AGENT_HOST enables tracing
31+
trace := tracing.NewFromEnv("query-frontend")
32+
defer trace.Close()
33+
34+
util.InitLogger(&serverConfig)
35+
36+
server, err := server.New(serverConfig)
37+
if err != nil {
38+
level.Error(util.Logger).Log("msg", "error initializing server", "err", err)
39+
os.Exit(1)
40+
}
41+
defer server.Shutdown()
42+
43+
f, err := frontend.New(frontendConfig, util.Logger)
44+
if err != nil {
45+
level.Error(util.Logger).Log("msg", "error initializing frontend", "err", err)
46+
os.Exit(1)
47+
}
48+
defer f.Close()
49+
50+
frontend.RegisterFrontendServer(server.GRPC, f)
51+
server.HTTP.PathPrefix("/api/prom").Handler(middleware.AuthenticateUser.Wrap(f))
52+
server.Run()
53+
}

pkg/querier/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type Config struct {
1818
Iterators bool
1919
}
2020

21-
// RegisterFlags adds the flags required to config this to the given FlagSet
21+
// RegisterFlags adds the flags required to config this to the given FlagSet.
2222
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2323
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
2424
f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.")

pkg/querier/frontend/frontend.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package frontend
2+
3+
import (
4+
"flag"
5+
"math/rand"
6+
"net/http"
7+
"sync"
8+
9+
"github.com/go-kit/kit/log"
10+
"github.com/go-kit/kit/log/level"
11+
"github.com/weaveworks/common/httpgrpc"
12+
"github.com/weaveworks/common/httpgrpc/server"
13+
"github.com/weaveworks/common/user"
14+
)
15+
16+
var (
17+
errServerClosing = httpgrpc.Errorf(http.StatusTeapot, "server closing down")
18+
errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")
19+
errCanceled = httpgrpc.Errorf(http.StatusInternalServerError, "context cancelled")
20+
)
21+
22+
// Config for a Frontend.
23+
type Config struct {
24+
MaxOutstandingPerTenant int
25+
MaxRetries int
26+
}
27+
28+
// RegisterFlags adds the flags required to config this to the given FlagSet.
29+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
30+
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "")
31+
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "")
32+
}
33+
34+
// Frontend queues HTTP requests, dispatches them to backends, and handles retries
35+
// for requests which failed.
36+
type Frontend struct {
37+
cfg Config
38+
log log.Logger
39+
40+
mtx sync.Mutex
41+
cond *sync.Cond
42+
closed bool
43+
queues map[string]chan *request
44+
}
45+
46+
type request struct {
47+
request *httpgrpc.HTTPRequest
48+
err chan error
49+
response chan *httpgrpc.HTTPResponse
50+
}
51+
52+
// New creates a new frontend.
53+
func New(cfg Config, log log.Logger) (*Frontend, error) {
54+
f := &Frontend{
55+
cfg: cfg,
56+
log: log,
57+
queues: map[string]chan *request{},
58+
}
59+
f.cond = sync.NewCond(&f.mtx)
60+
return f, nil
61+
}
62+
63+
// Close stops new requests and errors out any pending requests.
64+
func (f *Frontend) Close() {
65+
f.mtx.Lock()
66+
defer f.mtx.Unlock()
67+
68+
f.closed = true
69+
f.cond.Broadcast()
70+
71+
for _, queue := range f.queues {
72+
close(queue)
73+
for request := range queue {
74+
request.err <- errServerClosing
75+
}
76+
}
77+
}
78+
79+
// ServeHTTP serves HTTP requests.
80+
func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
81+
if err := f.serveHTTP(w, r); err != nil {
82+
server.WriteError(w, err)
83+
}
84+
}
85+
86+
func (f *Frontend) serveHTTP(w http.ResponseWriter, r *http.Request) error {
87+
ctx := r.Context()
88+
userID, err := user.ExtractOrgID(ctx)
89+
if err != nil {
90+
return err
91+
}
92+
93+
req, err := server.HTTPRequest(r)
94+
if err != nil {
95+
return err
96+
}
97+
98+
request := &request{
99+
request: req,
100+
// Buffer of 1 to ensure response can be written even if client has gone away.
101+
err: make(chan error, 1),
102+
response: make(chan *httpgrpc.HTTPResponse, 1),
103+
}
104+
105+
var lastErr error
106+
for retries := 0; retries < f.cfg.MaxRetries; retries++ {
107+
if err := f.queueRequest(userID, request); err != nil {
108+
return err
109+
}
110+
111+
var resp *httpgrpc.HTTPResponse
112+
select {
113+
case <-ctx.Done():
114+
// TODO propagate cancellation.
115+
//request.Cancel()
116+
return errCanceled
117+
118+
case resp = <-request.response:
119+
case lastErr = <-request.err:
120+
level.Error(f.log).Log("msg", "error processing request", "try", retries, "err", lastErr)
121+
resp, _ = httpgrpc.HTTPResponseFromError(lastErr)
122+
}
123+
124+
// Only fail is we get a valid HTTP non-500; otherwise retry.
125+
if resp != nil && resp.Code/100 != 5 {
126+
server.WriteResponse(w, resp)
127+
return nil
128+
}
129+
}
130+
131+
return lastErr
132+
}
133+
134+
// Process allows backends to pull requests from the frontend.
135+
func (f *Frontend) Process(server Frontend_ProcessServer) error {
136+
for {
137+
request := f.getNextRequest()
138+
if request == nil {
139+
// Occurs when server is shutting down.
140+
return nil
141+
}
142+
143+
if err := server.Send(&ProcessRequest{
144+
HttpRequest: request.request,
145+
}); err != nil {
146+
request.err <- err
147+
return err
148+
}
149+
150+
response, err := server.Recv()
151+
if err != nil {
152+
request.err <- err
153+
return err
154+
}
155+
156+
request.response <- response.HttpResponse
157+
}
158+
}
159+
160+
func (f *Frontend) queueRequest(userID string, req *request) error {
161+
f.mtx.Lock()
162+
defer f.mtx.Unlock()
163+
164+
if f.closed {
165+
return errServerClosing
166+
}
167+
168+
queue, ok := f.queues[userID]
169+
if !ok {
170+
queue = make(chan *request, f.cfg.MaxOutstandingPerTenant)
171+
f.queues[userID] = queue
172+
}
173+
174+
select {
175+
case queue <- req:
176+
f.cond.Signal()
177+
return nil
178+
default:
179+
return errTooManyRequest
180+
}
181+
}
182+
183+
// getQueue picks a random queue and takes the next request off of it, so we
184+
// faily process users queries. Will block if there are no requests.
185+
func (f *Frontend) getNextRequest() *request {
186+
f.mtx.Lock()
187+
defer f.mtx.Unlock()
188+
189+
for len(f.queues) == 0 && !f.closed {
190+
f.cond.Wait()
191+
}
192+
193+
if f.closed {
194+
return nil
195+
}
196+
197+
i, n := 0, rand.Intn(len(f.queues))
198+
for userID, queue := range f.queues {
199+
if i < n {
200+
i++
201+
continue
202+
}
203+
204+
request := <-queue
205+
if len(queue) == 0 {
206+
delete(f.queues, userID)
207+
}
208+
return request
209+
}
210+
211+
panic("should never happen")
212+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
syntax = "proto3";
2+
3+
package frontend;
4+
5+
option go_package = "frontend";
6+
7+
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
8+
import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto";
9+
10+
option (gogoproto.marshaler_all) = true;
11+
option (gogoproto.unmarshaler_all) = true;
12+
13+
service Frontend {
14+
rpc Process(stream ProcessResponse) returns (stream ProcessRequest) {};
15+
}
16+
17+
message ProcessRequest {
18+
httpgrpc.HTTPRequest httpRequest = 1;
19+
}
20+
21+
message ProcessResponse {
22+
httpgrpc.HTTPResponse httpResponse = 1;
23+
}

0 commit comments

Comments
 (0)