Skip to content

Commit

Permalink
feat: log local TCP address
Browse files Browse the repository at this point in the history
When troubleshooting issues related to keepalive it is often useful to
be able to correlate multiple requests made over the same connection. A
first step was to introduce the information on whether the request was
made on a re-used connection or not which was done with a7dbf84. This
commit adds the local address of the TCP connection which adds the
capability to distinguish between multiple, parallel keep-alive
connections by looking at the client port of gorouter for the backend
connection.
  • Loading branch information
maxmoehl authored and ameowlia committed Jan 31, 2025
1 parent b0d88bb commit a34e690
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 10 deletions.
14 changes: 14 additions & 0 deletions accesslog/schema/access_log_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type AccessLogRecord struct {
LogAttemptsDetails bool
FailedAttempts int
RoundTripSuccessful bool
ExtraFields []string
record []byte

// See the handlers.RequestInfo struct for details on these timings.
Expand All @@ -124,6 +125,8 @@ type AccessLogRecord struct {
TlsHandshakeFinishedAt time.Time
AppRequestFinishedAt time.Time
FinishedAt time.Time

LocalAddress string
}

func (r *AccessLogRecord) formatStartedAt() string {
Expand Down Expand Up @@ -319,6 +322,17 @@ func (r *AccessLogRecord) makeRecord(performTruncate bool) []byte {
b.WriteDashOrFloatValue(r.successfulAttemptTime())
}

// We have to consider the impact of iterating over a list. This technically allows to repeat
// some of the fields but it allows us to iterate over the list only once instead of once per
// field when we perform a [slices.Contains] check.
for _, field := range r.ExtraFields {
switch field {
case "local_address":
b.WriteString(`local_address:`)
b.WriteDashOrStringValue(r.LocalAddress)
}
}

b.AppendSpaces(false)
// #nosec G104 - ignore errors from writing the access log as it will only cause more errors to log this error
b.WriteString(`x_cf_routererror:`)
Expand Down
37 changes: 37 additions & 0 deletions accesslog/schema/access_log_record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,43 @@ var _ = Describe("AccessLogRecord", func() {
Eventually(r).Should(Say(`x_cf_routererror:"-"`))
})
})

Context("when extra_fields is set to [local_address]", func() {
Context("and the local address is empty", func() {
It("makes a record with the local address set to -", func() {
record.ExtraFields = []string{"local_address"}

r := BufferReader(bytes.NewBufferString(record.LogMessage()))
Eventually(r).Should(Say(`local_address:"-"`))
})
})
Context("and the local address contains an address", func() {
It("makes a record with the local address set to that address", func() {
record.ExtraFields = []string{"local_address"}
record.LocalAddress = "10.0.0.1:34823"

r := BufferReader(bytes.NewBufferString(record.LogMessage()))
Eventually(r).Should(Say(`local_address:"10.0.0.1:34823"`))
})
})
})

Context("when extra_fields is set to [foobarbazz]", func() {
It("ignores it", func() {
record.ExtraFields = []string{"foobarbazz"}
record.LocalAddress = "10.0.0.1:34823"

r := BufferReader(bytes.NewBufferString(record.LogMessage()))
Consistently(r).ShouldNot(Say("foobarbazz"))
})
It("does not log local_address", func() {
record.ExtraFields = []string{"foobarbazz"}
record.LocalAddress = "10.0.0.1:34823"

r := BufferReader(bytes.NewBufferString(record.LogMessage()))
Consistently(r).ShouldNot(Say(`local_address:"10.0.0.1:34823"`))
})
})
})

Describe("WriteTo", func() {
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ type LoggingConfig struct {
RedactQueryParams string `yaml:"redact_query_params"`
EnableAttemptsDetails bool `yaml:"enable_attempts_details"`
Format FormatConfig `yaml:"format"`
ExtraAccessLogFields []string `yaml:"extra_access_log_fields"`

// This field is populated by the `Process` function.
JobName string `yaml:"-"`
Expand Down
23 changes: 23 additions & 0 deletions handlers/access_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"log/slog"
"net/http"
"slices"
"sync/atomic"
"time"

Expand All @@ -20,6 +21,7 @@ type accessLog struct {
accessLogger accesslog.AccessLogger
extraHeadersToLog []string
logAttemptsDetails bool
extraFields []string
logger *slog.Logger
}

Expand All @@ -29,12 +31,14 @@ func NewAccessLog(
accessLogger accesslog.AccessLogger,
extraHeadersToLog []string,
logAttemptsDetails bool,
extraFields []string,
logger *slog.Logger,
) negroni.Handler {
return &accessLog{
accessLogger: accessLogger,
extraHeadersToLog: extraHeadersToLog,
logAttemptsDetails: logAttemptsDetails,
extraFields: deduplicate(extraFields),
logger: logger,
}
}
Expand All @@ -46,6 +50,7 @@ func (a *accessLog) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http
Request: r,
ExtraHeadersToLog: a.extraHeadersToLog,
LogAttemptsDetails: a.logAttemptsDetails,
ExtraFields: a.extraFields,
}

requestBodyCounter := &countingReadCloser{delegate: r.Body}
Expand Down Expand Up @@ -82,6 +87,8 @@ func (a *accessLog) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http
alr.AppRequestFinishedAt = reqInfo.AppRequestFinishedAt
alr.FinishedAt = reqInfo.FinishedAt

alr.LocalAddress = reqInfo.LocalAddress

a.accessLogger.Log(*alr)
}

Expand All @@ -105,3 +112,19 @@ func (crc *countingReadCloser) GetCount() int {
func (crc *countingReadCloser) Close() error {
return crc.delegate.Close()
}

func deduplicate[S ~[]E, E comparable](s S) S {
// costs some memory and requires an allocation but reduces complexity from O(n^2)
// to O(n) where n = len(s)
m := make(map[E]struct{}, len(s))
return slices.DeleteFunc(s, func(s E) bool {
_, ok := m[s]
if ok {
return true
}

m[s] = struct{}{}

return false
})
}
22 changes: 20 additions & 2 deletions handlers/access_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var _ = Describe("AccessLog", func() {
handler = negroni.New()
handler.Use(handlers.NewRequestInfo())
handler.Use(handlers.NewProxyWriter(logger.Logger))
handler.Use(handlers.NewAccessLog(accessLogger, extraHeadersToLog, false, logger.Logger))
handler.Use(handlers.NewAccessLog(accessLogger, extraHeadersToLog, false, nil, logger.Logger))
handler.Use(nextHandler)

reqChan = make(chan *http.Request, 1)
Expand Down Expand Up @@ -116,6 +116,24 @@ var _ = Describe("AccessLog", func() {
Expect(alr.RouterError).To(BeEmpty())
})

Context("when duplicate extraFields are set", func() {
BeforeEach(func() {
handler = negroni.New()
handler.Use(handlers.NewRequestInfo())
handler.Use(handlers.NewProxyWriter(logger.Logger))
handler.Use(handlers.NewAccessLog(accessLogger, extraHeadersToLog, false, []string{"local_address", "local_address"}, logger.Logger))
handler.Use(nextHandler)
})
It("only logs them once", func() {
handler.ServeHTTP(resp, req)
Expect(accessLogger.LogCallCount()).To(Equal(1))

alr := accessLogger.LogArgsForCall(0)

Expect(alr.ExtraFields).To(Equal([]string{"local_address"}))
})
})

Context("when there are backend request headers on the context", func() {
BeforeEach(func() {
extraHeadersHandler := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -146,7 +164,7 @@ var _ = Describe("AccessLog", func() {
BeforeEach(func() {
handler = negroni.New()
handler.UseFunc(testProxyWriterHandler)
handler.Use(handlers.NewAccessLog(accessLogger, extraHeadersToLog, false, logger.Logger))
handler.Use(handlers.NewAccessLog(accessLogger, extraHeadersToLog, false, nil, logger.Logger))
handler.Use(nextHandler)
})
It("calls Panic on the logger", func() {
Expand Down
2 changes: 2 additions & 0 deletions handlers/requestinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type RequestInfo struct {
ShouldRouteToInternalRouteService bool
FailedAttempts int

LocalAddress string

// RoundTripSuccessful will be set once a request has successfully reached a backend instance.
RoundTripSuccessful bool

Expand Down
2 changes: 1 addition & 1 deletion proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func NewProxy(
n.Use(handlers.NewHTTPLatencyPrometheus(p.promRegistry))
}
}
n.Use(handlers.NewAccessLog(accessLogger, headersToLog, cfg.Logging.EnableAttemptsDetails, logger))
n.Use(handlers.NewAccessLog(accessLogger, headersToLog, cfg.Logging.EnableAttemptsDetails, cfg.Logging.ExtraAccessLogFields, logger))
n.Use(handlers.NewQueryParam(logger))
n.Use(handlers.NewReporter(reporter, logger))
n.Use(handlers.NewHTTPRewriteHandler(cfg.HTTPRewrite, headersToAlwaysRemove))
Expand Down
3 changes: 3 additions & 0 deletions proxy/round_tripper/proxy_round_tripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (rt *roundTripper) RoundTrip(originalRequest *http.Request) (*http.Response
slog.Float64("dns-lookup-time", trace.DnsTime()),
slog.Float64("dial-time", trace.DialTime()),
slog.Float64("tls-handshake-time", trace.TlsTime()),
slog.String("local-address", trace.LocalAddr()),
)

if err != nil {
Expand Down Expand Up @@ -255,6 +256,7 @@ func (rt *roundTripper) RoundTrip(originalRequest *http.Request) (*http.Response
slog.Float64("dns-lookup-time", trace.DnsTime()),
slog.Float64("dial-time", trace.DialTime()),
slog.Float64("tls-handshake-time", trace.TlsTime()),
slog.String("local-address", trace.LocalAddr()),
)

if err != nil {
Expand Down Expand Up @@ -347,6 +349,7 @@ func (rt *roundTripper) RoundTrip(originalRequest *http.Request) (*http.Response
reqInfo.DialFinishedAt = trace.DialDone()
reqInfo.TlsHandshakeStartedAt = trace.TlsStart()
reqInfo.TlsHandshakeFinishedAt = trace.TlsDone()
reqInfo.LocalAddress = trace.LocalAddr()

if res != nil && endpoint.PrivateInstanceId != "" && !requestSentToRouteService(request) {
setupStickySession(
Expand Down
23 changes: 16 additions & 7 deletions proxy/round_tripper/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
// requestTracer holds trace data of a single request.
type requestTracer struct {
gotConn atomic.Bool
connInfo atomic.Pointer[httptrace.GotConnInfo]
connReused atomic.Bool
wroteHeaders atomic.Bool
localAddr atomic.Pointer[string]

// all times are stored as returned by time.Time{}.UnixNano()
dnsStart atomic.Int64
Expand All @@ -26,8 +27,9 @@ type requestTracer struct {
// Reset the trace data. Helpful when performing the same request again.
func (t *requestTracer) Reset() {
t.gotConn.Store(false)
t.connInfo.Store(nil)
t.connReused.Store(false)
t.wroteHeaders.Store(false)
t.localAddr.Store(nil)
t.dnsStart.Store(0)
t.dnsDone.Store(0)
t.dialStart.Store(0)
Expand All @@ -49,11 +51,15 @@ func (t *requestTracer) WroteHeaders() bool {
// ConnReused returns true if the traced request used an idle connection.
// it returns false if no idle connection was used or if the information was unavailable.
func (t *requestTracer) ConnReused() bool {
info := t.connInfo.Load()
if info != nil {
return info.Reused
return t.connReused.Load()
}

func (t *requestTracer) LocalAddr() string {
p := t.localAddr.Load()
if p == nil {
return ""
}
return false
return *p
}

func (t *requestTracer) DnsStart() time.Time {
Expand Down Expand Up @@ -121,7 +127,10 @@ func traceRequest(req *http.Request) (*http.Request, *requestTracer) {
r2 := req.WithContext(httptrace.WithClientTrace(req.Context(), &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
t.gotConn.Store(true)
t.connInfo.Store(&info)
t.connReused.Store(info.Reused)
la := info.Conn.LocalAddr().String()
t.localAddr.Store(&la)

// FIXME: due to https://github.com/golang/go/issues/31259 this breaks our acceptance tests and is dangerous
// disabled for now even though this will reduce the number of requests we can retry
// if !info.Reused {
Expand Down

0 comments on commit a34e690

Please sign in to comment.