Skip to content

Commit a795d53

Browse files
authored
feat: Added support for logging of HTTP Headers (#4803)
* feat: Added support for logging of HTTP Headers Signed-off-by: Zach Speaks <zspeaks@amazon.com> * Fixed test failing due to namespace overlap Signed-off-by: Zach Speaks <zspeaks@amazon.com> * Updates to reflect PR feedback Signed-off-by: Zach Speaks <zspeaks@amazon.com> * Rough draft propogation to ingester + logging in ingester Signed-off-by: Zach Speaks <zspeaks@amazon.com> * Added Header Propogation to Ingester and Querier Added Header Propagation to Ingester and Querier Signed-off-by: Zach Speaks <zspeaks@amazon.com> Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Addressed whitespace and linting issues Signed-off-by: Zach Speaks <zspeaks@amazon.com> Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Additional linting fixes Signed-off-by: Zach Speaks <zspeaks@amazon.com> Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Fixed failing DCO check Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Removed outdated line from prior test design Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Updates to reflect PR feedback Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Additional Updates to reflect feedback Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Improved string comparison when decoding Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Added header decoding to frontend_processor.go for when scheduler service is not being used Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Added encoding/decoding interceptors for streams and updated changelog Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Undo accidental updates to single-process-config-block.yaml file Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Whitespace updates Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Refactored encode/decode headers to inject/extract to better align with existing code Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Fixed failing linter Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Switched extraction to being done by a middleware on the querier API Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Linting Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Changes to reflect PR feedback Signed-off-by: Zachary Speaks <zspeaks@amazon.com> * Updates to tests to match go conventions Signed-off-by: Zachary Speaks <zspeaks@amazon.com> Signed-off-by: Zach Speaks <zspeaks@amazon.com> Signed-off-by: Zachary Speaks <zspeaks@amazon.com>
1 parent 87fcf87 commit a795d53

File tree

17 files changed

+518
-37
lines changed

17 files changed

+518
-37
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818
5050
* [FEATURE] Ruler: Add support to pass custom implementations of queryable and pusher. #4782
5151
* [FEATURE] Create OpenTelemetry Bridge for Tracing. Now cortex can send traces to multiple destinations using OTEL Collectors. #4834
52+
* [FEATURE] Added `-api.http-request-headers-to-log` allowing for the addition of HTTP Headers to logs #4803
5253
* [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804
5354
* [BUGFIX] Ruler: Fix /ruler/rule_groups returns YAML with extra fields. #4767
5455

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ api:
8282
# CLI flag: -http.prometheus-http-prefix
8383
[prometheus_http_prefix: <string> | default = "/prometheus"]
8484

85+
# Which HTTP Request headers to add to logs
86+
# CLI flag: -api.http-request-headers-to-log
87+
[http_request_headers_to_log: <list of string> | default = []]
88+
8589
# The server_config configures the HTTP and gRPC server of the launched
8690
# service(s).
8791
[server: <server_config>]

pkg/api/api.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
3535
"github.com/cortexproject/cortex/pkg/storegateway"
3636
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
37+
"github.com/cortexproject/cortex/pkg/util/flagext"
3738
"github.com/cortexproject/cortex/pkg/util/push"
3839
)
3940

@@ -61,11 +62,15 @@ type Config struct {
6162
// initialized, the custom config handler will be used instead of
6263
// DefaultConfigHandler.
6364
CustomConfigHandler ConfigHandler `yaml:"-"`
65+
66+
// Allows and is used to configure the addition of HTTP Header fields to logs
67+
HTTPRequestHeadersToLog flagext.StringSlice `yaml:"http_request_headers_to_log"`
6468
}
6569

6670
// RegisterFlags adds the flags required to config this to the given FlagSet.
6771
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6872
f.BoolVar(&cfg.ResponseCompression, "api.response-compression-enabled", false, "Use GZIP compression for API responses. Some endpoints serve large YAML or JSON blobs which can benefit from compression.")
73+
f.Var(&cfg.HTTPRequestHeadersToLog, "api.http-request-headers-to-log", "Which HTTP Request headers to add to logs")
6974
cfg.RegisterFlagsWithPrefix("", f)
7075
}
7176

@@ -85,13 +90,13 @@ func (cfg *Config) wrapDistributorPush(d *distributor.Distributor) push.Func {
8590
}
8691

8792
type API struct {
88-
AuthMiddleware middleware.Interface
89-
90-
cfg Config
91-
server *server.Server
92-
logger log.Logger
93-
sourceIPs *middleware.SourceIPExtractor
94-
indexPage *IndexPageContent
93+
AuthMiddleware middleware.Interface
94+
cfg Config
95+
server *server.Server
96+
logger log.Logger
97+
sourceIPs *middleware.SourceIPExtractor
98+
indexPage *IndexPageContent
99+
HTTPHeaderMiddleware *HTTPHeaderMiddleware
95100
}
96101

97102
func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logger) (*API, error) {
@@ -121,6 +126,9 @@ func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logge
121126
if cfg.HTTPAuthMiddleware == nil {
122127
api.AuthMiddleware = middleware.AuthenticateUser
123128
}
129+
if len(cfg.HTTPRequestHeadersToLog) > 0 {
130+
api.HTTPHeaderMiddleware = &HTTPHeaderMiddleware{TargetHeaders: cfg.HTTPRequestHeadersToLog}
131+
}
124132

125133
return api, nil
126134
}
@@ -139,6 +147,9 @@ func (a *API) RegisterRoute(path string, handler http.Handler, auth bool, method
139147
if a.cfg.ResponseCompression {
140148
handler = gziphandler.GzipHandler(handler)
141149
}
150+
if a.HTTPHeaderMiddleware != nil {
151+
handler = a.HTTPHeaderMiddleware.Wrap(handler)
152+
}
142153

143154
if len(methods) == 0 {
144155
a.server.HTTP.Path(path).Handler(handler)
@@ -156,6 +167,9 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth
156167
if a.cfg.ResponseCompression {
157168
handler = gziphandler.GzipHandler(handler)
158169
}
170+
if a.HTTPHeaderMiddleware != nil {
171+
handler = a.HTTPHeaderMiddleware.Wrap(handler)
172+
}
159173

160174
if len(methods) == 0 {
161175
a.server.HTTP.PathPrefix(prefix).Handler(handler)

pkg/api/api_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,37 @@ func TestNewApiWithInvalidSourceIPExtractor(t *testing.T) {
6060
require.Error(t, err)
6161
require.Nil(t, api)
6262
}
63+
64+
func TestNewApiWithHeaderLogging(t *testing.T) {
65+
cfg := Config{
66+
HTTPRequestHeadersToLog: []string{"ForTesting"},
67+
}
68+
serverCfg := server.Config{
69+
HTTPListenNetwork: server.DefaultNetwork,
70+
MetricsNamespace: "with_header_logging",
71+
}
72+
server, err := server.New(serverCfg)
73+
require.NoError(t, err)
74+
75+
api, err := New(cfg, serverCfg, server, &FakeLogger{})
76+
require.NoError(t, err)
77+
require.NotNil(t, api.HTTPHeaderMiddleware)
78+
79+
}
80+
81+
func TestNewApiWithoutHeaderLogging(t *testing.T) {
82+
cfg := Config{
83+
HTTPRequestHeadersToLog: []string{},
84+
}
85+
serverCfg := server.Config{
86+
HTTPListenNetwork: server.DefaultNetwork,
87+
MetricsNamespace: "without_header_logging",
88+
}
89+
server, err := server.New(serverCfg)
90+
require.NoError(t, err)
91+
92+
api, err := New(cfg, serverCfg, server, &FakeLogger{})
93+
require.NoError(t, err)
94+
require.Nil(t, api.HTTPHeaderMiddleware)
95+
96+
}

pkg/api/middlewares.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package api
22

33
import (
4+
"context"
45
"net/http"
56

67
"github.com/weaveworks/common/middleware"
78

89
"github.com/cortexproject/cortex/pkg/chunk/purger"
910
"github.com/cortexproject/cortex/pkg/querier/queryrange"
1011
"github.com/cortexproject/cortex/pkg/tenant"
12+
util_log "github.com/cortexproject/cortex/pkg/util/log"
1113
)
1214

1315
// middleware for setting cache gen header to let consumer of response know all previous responses could be invalid due to delete operation
@@ -27,3 +29,35 @@ func getHTTPCacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader purger.To
2729
})
2830
})
2931
}
32+
33+
// HTTPHeaderMiddleware adds specified HTTPHeaders to the request context
34+
type HTTPHeaderMiddleware struct {
35+
TargetHeaders []string
36+
}
37+
38+
// InjectTargetHeadersIntoHTTPRequest injects specified HTTPHeaders into the request context
39+
func (h HTTPHeaderMiddleware) InjectTargetHeadersIntoHTTPRequest(r *http.Request) context.Context {
40+
headerMap := make(map[string]string)
41+
42+
// Check to make sure that Headers have not already been injected
43+
checkMapInContext := util_log.HeaderMapFromContext(r.Context())
44+
if checkMapInContext != nil {
45+
return r.Context()
46+
}
47+
48+
for _, target := range h.TargetHeaders {
49+
contents := r.Header.Get(target)
50+
if contents != "" {
51+
headerMap[target] = contents
52+
}
53+
}
54+
return util_log.ContextWithHeaderMap(r.Context(), headerMap)
55+
}
56+
57+
// Wrap implements Middleware
58+
func (h HTTPHeaderMiddleware) Wrap(next http.Handler) http.Handler {
59+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
60+
ctx := h.InjectTargetHeadersIntoHTTPRequest(r)
61+
next.ServeHTTP(w, r.WithContext(ctx))
62+
})
63+
}

pkg/api/middlewares_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package api
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
util_log "github.com/cortexproject/cortex/pkg/util/log"
11+
)
12+
13+
var HTTPTestMiddleware = HTTPHeaderMiddleware{TargetHeaders: []string{"TestHeader1", "TestHeader2", "Test3"}}
14+
15+
func TestHeaderInjection(t *testing.T) {
16+
ctx := context.Background()
17+
h := http.Header{}
18+
contentsMap := make(map[string]string)
19+
contentsMap["TestHeader1"] = "RequestID"
20+
contentsMap["TestHeader2"] = "ContentsOfTestHeader2"
21+
contentsMap["Test3"] = "SomeInformation"
22+
23+
h.Add("TestHeader1", contentsMap["TestHeader1"])
24+
h.Add("TestHeader2", contentsMap["TestHeader2"])
25+
h.Add("Test3", contentsMap["Test3"])
26+
27+
req := &http.Request{
28+
Method: "GET",
29+
RequestURI: "/HTTPHeaderTest",
30+
Body: http.NoBody,
31+
Header: h,
32+
}
33+
34+
req = req.WithContext(ctx)
35+
ctx = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)
36+
37+
headerMap := util_log.HeaderMapFromContext(ctx)
38+
require.NotNil(t, headerMap)
39+
40+
for _, header := range HTTPTestMiddleware.TargetHeaders {
41+
require.Equal(t, contentsMap[header], headerMap[header])
42+
}
43+
for header, contents := range contentsMap {
44+
require.Equal(t, contents, headerMap[header])
45+
}
46+
}
47+
48+
func TestExistingHeaderInContextIsNotOverridden(t *testing.T) {
49+
ctx := context.Background()
50+
51+
h := http.Header{}
52+
contentsMap := make(map[string]string)
53+
contentsMap["TestHeader1"] = "RequestID"
54+
contentsMap["TestHeader2"] = "ContentsOfTestHeader2"
55+
contentsMap["Test3"] = "SomeInformation"
56+
57+
h.Add("TestHeader1", "Fail1")
58+
h.Add("TestHeader2", "Fail2")
59+
h.Add("Test3", "Fail3")
60+
61+
ctx = util_log.ContextWithHeaderMap(ctx, contentsMap)
62+
req := &http.Request{
63+
Method: "GET",
64+
RequestURI: "/HTTPHeaderTest",
65+
Body: http.NoBody,
66+
Header: h,
67+
}
68+
69+
req = req.WithContext(ctx)
70+
ctx = HTTPTestMiddleware.InjectTargetHeadersIntoHTTPRequest(req)
71+
72+
require.Equal(t, contentsMap, util_log.HeaderMapFromContext(ctx))
73+
74+
}

pkg/cortex/cortex.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ func New(cfg Config) (*Cortex, error) {
353353
}
354354

355355
cortex.setupThanosTracing()
356+
cortex.setupGRPCHeaderForwarding()
356357

357358
if err := cortex.setupModuleManager(); err != nil {
358359
return nil, err
@@ -368,6 +369,15 @@ func (t *Cortex) setupThanosTracing() {
368369
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, ThanosTracerStreamInterceptor)
369370
}
370371

372+
// setupGRPCHeaderForwarding appends a gRPC middleware used to enable the propagation of
373+
// HTTP Headers through child gRPC calls
374+
func (t *Cortex) setupGRPCHeaderForwarding() {
375+
if len(t.Cfg.API.HTTPRequestHeadersToLog) > 0 {
376+
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcutil.HTTPHeaderPropagationServerInterceptor)
377+
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, grpcutil.HTTPHeaderPropagationStreamServerInterceptor)
378+
}
379+
}
380+
371381
// Run starts Cortex running, and blocks until a Cortex stops.
372382
func (t *Cortex) Run() error {
373383
// Register custom process metrics.

pkg/cortex/modules.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,10 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
328328
// HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the
329329
// request context.
330330
internalQuerierRouter = t.API.AuthMiddleware.Wrap(internalQuerierRouter)
331+
332+
if len(t.Cfg.API.HTTPRequestHeadersToLog) > 0 {
333+
internalQuerierRouter = t.API.HTTPHeaderMiddleware.Wrap(internalQuerierRouter)
334+
}
331335
}
332336

333337
// If neither frontend address or scheduler address is configured, no worker is needed.

pkg/distributor/distributor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
786786
if sp := opentracing.SpanFromContext(ctx); sp != nil {
787787
localCtx = opentracing.ContextWithSpan(localCtx, sp)
788788
}
789+
// Get any HTTP headers that are supposed to be added to logs and add to localCtx for later use
790+
if headerMap := util_log.HeaderMapFromContext(ctx); headerMap != nil {
791+
localCtx = util_log.ContextWithHeaderMap(localCtx, headerMap)
792+
}
789793

790794
// Get clientIP(s) from Context and add it to localCtx
791795
localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source)

0 commit comments

Comments
 (0)