forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker_service.go
157 lines (138 loc) · 6.23 KB
/
worker_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package querier
import (
"fmt"
"net/http"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
querier_worker "github.com/grafana/loki/pkg/querier/worker"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
serverutil "github.com/grafana/loki/pkg/util/server"
)
type WorkerServiceConfig struct {
AllEnabled bool
ReadEnabled bool
GrpcListenPort int
QuerierMaxConcurrent int
QuerierWorkerConfig *querier_worker.Config
QueryFrontendEnabled bool
QuerySchedulerEnabled bool
SchedulerRing ring.ReadRing
}
// InitWorkerService takes a config object, a map of routes to handlers, an external http router and external
// http handler, and an auth middleware wrapper. This function creates an internal HTTP router that responds to all
// the provided query routes/handlers. This router can either be registered with the external Loki HTTP server, or
// be used internally by a querier worker so that it does not conflict with the routes registered by the Query Frontend module.
//
// 1. Query-Frontend Enabled: If Loki has an All or QueryFrontend target, the internal
// HTTP router is wrapped with Tenant ID parsing middleware and passed to the frontend
// worker.
//
// 2. Querier Standalone: The querier will register the internal HTTP router with the external
// HTTP router for the Prometheus API routes. Then the external HTTP server will be passed
// as a http.Handler to the frontend worker.
//
func InitWorkerService(
cfg WorkerServiceConfig,
reg prometheus.Registerer,
queryRoutesToHandlers map[string]http.Handler,
alwaysExternalRoutesToHandlers map[string]http.Handler,
externalRouter *mux.Router,
externalHandler http.Handler,
authMiddleware middleware.Interface,
) (serve services.Service, err error) {
// Create a couple Middlewares used to handle panics, perform auth, parse forms in http request, and set content type in response
handlerMiddleware := middleware.Merge(
httpreq.ExtractQueryTagsMiddleware(),
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
)
internalRouter := mux.NewRouter()
for route, handler := range queryRoutesToHandlers {
internalRouter.Path(route).Methods("GET", "POST").Handler(handler)
}
// There are some routes which are always registered on the external router, add them now and
// wrap them with the externalMiddleware
for route, handler := range alwaysExternalRoutesToHandlers {
externalRouter.Path(route).Methods("GET", "POST").Handler(handlerMiddleware.Wrap(handler))
}
// If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal
// HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the
// external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default
// middleware instrumentation.
if querierRunningStandalone(cfg) {
// First, register the internal querier handler with the external HTTP server
routes := make([]string, len(queryRoutesToHandlers))
var idx = 0
for route := range queryRoutesToHandlers {
routes[idx] = route
idx++
}
// Register routes externally
for _, route := range routes {
externalRouter.Path(route).Methods("GET", "POST").Handler(handlerMiddleware.Wrap(internalRouter))
}
//If no frontend or scheduler address has been configured, then there is no place for the
//querier worker to request work from, so no need to start a worker service
if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
return nil, nil
}
// If a frontend or scheduler address has been configured, return a querier worker service that uses
// the external Loki Server HTTP server, which has now has the internal handler's routes registered with it
return querier_worker.NewQuerierWorker(
*(cfg.QuerierWorkerConfig),
cfg.SchedulerRing,
httpgrpc_server.NewServer(externalHandler),
util_log.Logger,
reg,
)
}
// Since we must be running a querier with either a frontend and/or scheduler at this point, if no scheduler ring, frontend, or scheduler address
// is configured, Loki will default to using the frontend on localhost on it's own GRPC listening port.
if cfg.SchedulerRing == nil && (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
address := fmt.Sprintf("127.0.0.1:%d", cfg.GrpcListenPort)
level.Warn(util_log.Logger).Log(
"msg", "Worker address is empty, attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.",
"address", address)
cfg.QuerierWorkerConfig.FrontendAddress = address
}
// Add a middleware to extract the trace context and add a header.
var internalHandler http.Handler
internalHandler = nethttp.MiddlewareFunc(
opentracing.GlobalTracer(),
internalRouter.ServeHTTP,
nethttp.OperationNameFunc(func(r *http.Request) string {
return "internalQuerier"
}))
internalHandler = handlerMiddleware.Wrap(internalHandler)
//Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier
//and the query frontend
return querier_worker.NewQuerierWorker(
*(cfg.QuerierWorkerConfig),
cfg.SchedulerRing,
httpgrpc_server.NewServer(internalHandler),
util_log.Logger,
reg,
)
}
func querierRunningStandalone(cfg WorkerServiceConfig) bool {
runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.ReadEnabled && !cfg.AllEnabled
level.Debug(util_log.Logger).Log(
"msg", "determining if querier is running as standalone target",
"runningStandalone", runningStandalone,
"queryFrontendEnabled", cfg.QueryFrontendEnabled,
"queryScheduleEnabled", cfg.QuerySchedulerEnabled,
"readEnabled", cfg.ReadEnabled,
"allEnabled", cfg.AllEnabled,
)
return runningStandalone
}