From 0bbd2a25e82d17dc16a997731cf857755841af8b Mon Sep 17 00:00:00 2001 From: Konrad Galuszka Date: Fri, 22 Feb 2019 08:48:00 -0600 Subject: [PATCH] Add pprof and create admin endpoint (#1315) Signed-off-by: Konrad Galuszka --- cmd/all-in-one/main.go | 7 +- cmd/collector/main.go | 7 +- cmd/flags/flags.go | 28 ++-- cmd/ingester/main.go | 7 +- cmd/query/main.go | 7 +- pkg/adminendpoint/adminendpoint.go | 121 ++++++++++++++++++ pkg/adminendpoint/adminendpoint_test.go | 48 +++++++ .../healthcheck/healthcheck.go} | 55 ++------ .../healthcheck/healthcheck_test.go} | 22 +--- .../internal_test.go | 34 +++-- 10 files changed, 237 insertions(+), 99 deletions(-) create mode 100644 pkg/adminendpoint/adminendpoint.go create mode 100644 pkg/adminendpoint/adminendpoint_test.go rename pkg/{healthcheck/handler.go => adminendpoint/healthcheck/healthcheck.go} (69%) rename pkg/{healthcheck/handler_test.go => adminendpoint/healthcheck/healthcheck_test.go} (70%) rename pkg/{healthcheck => adminendpoint}/internal_test.go (61%) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index a0c0e87ff95..0c15fa2a842 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -53,8 +53,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/flags" queryApp "github.com/jaegertracing/jaeger/cmd/query/app" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" + "github.com/jaegertracing/jaeger/pkg/adminendpoint/healthcheck" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/healthcheck" pMetrics "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" "github.com/jaegertracing/jaeger/pkg/version" @@ -102,9 +102,10 @@ func main() { if err != nil { return err } - hc, err := sFlags.NewHealthCheck(logger) + hc := sFlags.NewHealthCheck(logger) + err = sFlags.NewAdminEndpoint(logger, hc) if err != nil { - logger.Fatal("Could not start the health check server.", zap.Error(err)) + logger.Fatal("Could not start the admin endpoint server.", zap.Error(err)) } mBldr := new(pMetrics.Builder).InitFromViper(v) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index d1797af1564..9181eed4f0b 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -43,8 +43,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/zipkin" "github.com/jaegertracing/jaeger/cmd/env" "github.com/jaegertracing/jaeger/cmd/flags" + "github.com/jaegertracing/jaeger/pkg/adminendpoint/healthcheck" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/healthcheck" pMetrics "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" "github.com/jaegertracing/jaeger/pkg/version" @@ -85,9 +85,10 @@ func main() { if err != nil { return err } - hc, err := sFlags.NewHealthCheck(logger) + hc := sFlags.NewHealthCheck(logger) + err = sFlags.NewAdminEndpoint(logger, hc) if err != nil { - logger.Fatal("Could not start the health check server.", zap.Error(err)) + logger.Fatal("Could not start the admin endpoint server.", zap.Error(err)) } builderOpts := new(builder.CollectorOptions).InitFromViper(v) diff --git a/cmd/flags/flags.go b/cmd/flags/flags.go index a50d35eb4d5..88892ae2311 100644 --- a/cmd/flags/flags.go +++ b/cmd/flags/flags.go @@ -23,7 +23,8 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" - hc "github.com/jaegertracing/jaeger/pkg/healthcheck" + ae "github.com/jaegertracing/jaeger/pkg/adminendpoint" + hc "github.com/jaegertracing/jaeger/pkg/adminendpoint/healthcheck" "github.com/jaegertracing/jaeger/plugin/storage" ) @@ -57,15 +58,15 @@ func TryLoadConfigFile(v *viper.Viper) error { type SharedFlags struct { // Logging holds logging configuration Logging logging - // HealthCheck holds health check configuration - HealthCheck healthCheck + // AdminEndpoint holds admin endpoint configuration + AdminEndpoint adminendpoint } type logging struct { Level string } -type healthCheck struct { +type adminendpoint struct { Port int } @@ -89,7 +90,7 @@ func AddLoggingFlag(flagSet *flag.FlagSet) { // InitFromViper initializes SharedFlags with properties from viper func (flags *SharedFlags) InitFromViper(v *viper.Viper) *SharedFlags { flags.Logging.Level = v.GetString(logLevel) - flags.HealthCheck.Port = v.GetInt(healthCheckHTTPPort) + flags.AdminEndpoint.Port = v.GetInt(healthCheckHTTPPort) return flags } @@ -104,11 +105,16 @@ func (flags *SharedFlags) NewLogger(conf zap.Config, options ...zap.Option) (*za return conf.Build(options...) } -// NewHealthCheck returns health check based on configuration in SharedFlags -func (flags *SharedFlags) NewHealthCheck(logger *zap.Logger) (*hc.HealthCheck, error) { - if flags.HealthCheck.Port == 0 { - return nil, errors.New("port not specified") +// NewHealthCheck returns health check +func (flags *SharedFlags) NewHealthCheck(logger *zap.Logger) *hc.HealthCheck { + return hc.New(hc.Unavailable, hc.Logger(logger)) +} + +// NewAdminEndpoint returns admin endpoint based on configuration in SharedFlags +func (flags *SharedFlags) NewAdminEndpoint(logger *zap.Logger, healthcheck *hc.HealthCheck) error { + if flags.AdminEndpoint.Port == 0 { + return errors.New("port not specified") } - return hc.New(hc.Unavailable, hc.Logger(logger)). - Serve(flags.HealthCheck.Port) + return ae.New(ae.Logger(logger), ae.HealthCheck(healthcheck)). + Serve(flags.AdminEndpoint.Port) } diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index 5e8ae49114c..345a5e97a63 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -34,8 +34,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/cmd/ingester/app" "github.com/jaegertracing/jaeger/cmd/ingester/app/builder" + "github.com/jaegertracing/jaeger/pkg/adminendpoint/healthcheck" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/healthcheck" pMetrics "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" "github.com/jaegertracing/jaeger/pkg/version" @@ -67,9 +67,10 @@ func main() { if err != nil { return err } - hc, err := sFlags.NewHealthCheck(logger) + hc := sFlags.NewHealthCheck(logger) + err = sFlags.NewAdminEndpoint(logger, hc) if err != nil { - logger.Fatal("Could not start the health check server.", zap.Error(err)) + logger.Fatal("Could not start the admin endpoint server.", zap.Error(err)) } mBldr := new(pMetrics.Builder).InitFromViper(v) diff --git a/cmd/query/main.go b/cmd/query/main.go index ead0e81e56c..1e02eff5eaf 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -36,8 +36,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/cmd/query/app" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" + "github.com/jaegertracing/jaeger/pkg/adminendpoint/healthcheck" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/healthcheck" pMetrics "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" "github.com/jaegertracing/jaeger/pkg/version" @@ -72,9 +72,10 @@ func main() { if err != nil { return err } - hc, err := sFlags.NewHealthCheck(logger) + hc := sFlags.NewHealthCheck(logger) + err = sFlags.NewAdminEndpoint(logger, hc) if err != nil { - logger.Fatal("Could not start the health check server.", zap.Error(err)) + logger.Fatal("Could not start the admin endpoint server.", zap.Error(err)) } queryOpts := new(app.QueryOptions).InitFromViper(v) diff --git a/pkg/adminendpoint/adminendpoint.go b/pkg/adminendpoint/adminendpoint.go new file mode 100644 index 00000000000..50b188b5627 --- /dev/null +++ b/pkg/adminendpoint/adminendpoint.go @@ -0,0 +1,121 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adminendpoint + +import ( + "context" + "net" + "net/http" + "net/http/pprof" + "strconv" + + "go.uber.org/zap" + + hc "github.com/jaegertracing/jaeger/pkg/adminendpoint/healthcheck" + "github.com/jaegertracing/jaeger/pkg/version" +) + +// AdminEndpoint provides an endpoint that returns the healthcheck or profiling tool +type AdminEndpoint struct { + logger *zap.Logger + server *http.Server + healthcheck *hc.HealthCheck +} + +// Option is a functional option for passing parameters to New() +type Option func(endpoint *AdminEndpoint) + +// Logger creates an option to set the logger. If not specified, Nop logger is used. +func Logger(logger *zap.Logger) Option { + return func(ae *AdminEndpoint) { + ae.logger = logger + } +} + +// HealthCheck creates an option to set the healthcheck. Required +func HealthCheck(healthcheck *hc.HealthCheck) Option { + return func(ae *AdminEndpoint) { + ae.healthcheck = healthcheck + } +} + +// New creates a AdminEndpoint +func New(options ...Option) *AdminEndpoint { + ae := &AdminEndpoint{} + for _, option := range options { + option(ae) + } + if ae.logger == nil { + ae.logger = zap.NewNop() + } + return ae +} + +// Serve starts HTTP server on the specified port. +func (ae *AdminEndpoint) Serve(port int) error { + portStr := ":" + strconv.Itoa(port) + l, err := net.Listen("tcp", portStr) + if err != nil { + ae.logger.Error("Admin endpoint server failed to listen", zap.Error(err)) + return err + } + ae.serveWithListener(l) + ae.logger.Info("Admin endpoint server started", zap.Int("http-port", port), zap.Stringer("status", ae.healthcheck.Get())) + return nil +} + +// ServeWithListener starts server using given listener +func (ae *AdminEndpoint) serveWithListener(l net.Listener) { + ae.server = &http.Server{Handler: ae.httpHandler()} + go func() { + if err := ae.server.Serve(l); err != nil { + ae.logger.Error("failed to serve", zap.Error(err)) + ae.healthcheck.Set(hc.Broken) + } + }() +} + +// Close stops the HTTP server +func (ae *AdminEndpoint) Close() error { + return ae.server.Shutdown(context.Background()) +} + +// httpHandler creates a new HTTP handler. +func (ae *AdminEndpoint) httpHandler() http.Handler { + mux := http.NewServeMux() + ae.registerHealthCheckHandler(mux) + ae.registerProfilingHandler(mux) + version.RegisterHandler(mux, ae.logger) + return mux +} + +// registerHealthCheckHandler registers +func (ae *AdminEndpoint) registerHealthCheckHandler(mux *http.ServeMux) { + mux.HandleFunc("/", ae.healthcheck.GetHandlerFunc()) +} + +// registerProfilingHandler adds pprof endpoints +func (ae *AdminEndpoint) registerProfilingHandler(mux *http.ServeMux) { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + mux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) + mux.Handle("/debug/pprof/heap", pprof.Handler("heap")) + mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + mux.Handle("/debug/pprof/block", pprof.Handler("block")) +} diff --git a/pkg/adminendpoint/adminendpoint_test.go b/pkg/adminendpoint/adminendpoint_test.go new file mode 100644 index 00000000000..56526e3a2ad --- /dev/null +++ b/pkg/adminendpoint/adminendpoint_test.go @@ -0,0 +1,48 @@ +// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adminendpoint_test + +import ( + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/pkg/adminendpoint" + "github.com/jaegertracing/jaeger/pkg/adminendpoint/healthcheck" + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestPortBusy(t *testing.T) { + l, err := net.Listen("tcp", ":0") + require.NoError(t, err) + defer l.Close() + port := l.Addr().(*net.TCPAddr).Port + + logger, logBuf := testutils.NewLogger() + err = adminendpoint.New(adminendpoint.Logger(logger)).Serve(port) + assert.Error(t, err) + assert.Equal(t, "Admin endpoint server failed to listen", logBuf.JSONLine(0)["msg"]) +} + +func TestServeHandler(t *testing.T) { + hc := healthcheck.New(healthcheck.Ready) + ae := adminendpoint.New(adminendpoint.HealthCheck(hc)) + err := ae.Serve(0) + require.NoError(t, err) + defer ae.Close() +} diff --git a/pkg/healthcheck/handler.go b/pkg/adminendpoint/healthcheck/healthcheck.go similarity index 69% rename from pkg/healthcheck/handler.go rename to pkg/adminendpoint/healthcheck/healthcheck.go index ac9eec2e977..9ff29363298 100644 --- a/pkg/healthcheck/handler.go +++ b/pkg/adminendpoint/healthcheck/healthcheck.go @@ -15,15 +15,10 @@ package healthcheck import ( - "context" - "net" "net/http" - "strconv" "sync/atomic" "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/pkg/version" ) // Status represents the state of the service. @@ -56,7 +51,6 @@ type HealthCheck struct { state int32 // atomic, keep at the top to be word-aligned logger *zap.Logger mapping map[Status]int - server *http.Server } // Option is a functional option for passing parameters to New() @@ -87,46 +81,6 @@ func New(state Status, options ...Option) *HealthCheck { return hc } -// Serve starts HTTP server on the specified port. -func (hc *HealthCheck) Serve(port int) (*HealthCheck, error) { - portStr := ":" + strconv.Itoa(port) - l, err := net.Listen("tcp", portStr) - if err != nil { - hc.logger.Error("Health Check server failed to listen", zap.Error(err)) - return nil, err - } - hc.serveWithListener(l) - hc.logger.Info("Health Check server started", zap.Int("http-port", port), zap.Stringer("status", hc.Get())) - return hc, nil -} - -func (hc *HealthCheck) serveWithListener(l net.Listener) { - hc.server = &http.Server{Handler: hc.httpHandler()} - go func() { - if err := hc.server.Serve(l); err != nil { - hc.logger.Error("failed to serve", zap.Error(err)) - hc.Set(Broken) - } - }() -} - -// Close stops the HTTP server -func (hc *HealthCheck) Close() error { - return hc.server.Shutdown(context.Background()) -} - -// httpHandler creates a new HTTP handler. -func (hc *HealthCheck) httpHandler() http.Handler { - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(hc.mapping[hc.Get()]) - // this is written only for response with an entity, so, it won't be used for a 204 - No content - w.Write([]byte("Server not available")) - }) - version.RegisterHandler(mux, hc.logger) - return mux -} - // Set a new health check status func (hc *HealthCheck) Set(state Status) { atomic.StoreInt32(&hc.state, int32(state)) @@ -142,3 +96,12 @@ func (hc *HealthCheck) Get() Status { func (hc *HealthCheck) Ready() { hc.Set(Ready) } + +// GetHandlerFunc returns endpoint handler +func (hc *HealthCheck) GetHandlerFunc() func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(hc.mapping[hc.Get()]) + // this is written only for response with an entity, so, it won't be used for a 204 - No content + w.Write([]byte("Server not available")) + } +} diff --git a/pkg/healthcheck/handler_test.go b/pkg/adminendpoint/healthcheck/healthcheck_test.go similarity index 70% rename from pkg/healthcheck/handler_test.go rename to pkg/adminendpoint/healthcheck/healthcheck_test.go index 31483e89d9b..69cb9412243 100644 --- a/pkg/healthcheck/handler_test.go +++ b/pkg/adminendpoint/healthcheck/healthcheck_test.go @@ -15,13 +15,11 @@ package healthcheck_test import ( - "net" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - . "github.com/jaegertracing/jaeger/pkg/healthcheck" + . "github.com/jaegertracing/jaeger/pkg/adminendpoint/healthcheck" "github.com/jaegertracing/jaeger/pkg/testutils" ) @@ -49,21 +47,3 @@ func TestStatusSetGet(t *testing.T) { assert.Equal(t, Ready, hc.Get()) assert.Equal(t, map[string]string{"level": "info", "msg": "Health Check state change", "status": "ready"}, logBuf.JSONLine(0)) } - -func TestPortBusy(t *testing.T) { - l, err := net.Listen("tcp", ":0") - require.NoError(t, err) - defer l.Close() - port := l.Addr().(*net.TCPAddr).Port - - logger, logBuf := testutils.NewLogger() - _, err = New(Unavailable, Logger(logger)).Serve(port) - assert.Error(t, err) - assert.Equal(t, "Health Check server failed to listen", logBuf.JSONLine(0)["msg"]) -} - -func TestServeHandler(t *testing.T) { - hc, err := New(Ready).Serve(0) - require.NoError(t, err) - defer hc.Close() -} diff --git a/pkg/healthcheck/internal_test.go b/pkg/adminendpoint/internal_test.go similarity index 61% rename from pkg/healthcheck/internal_test.go rename to pkg/adminendpoint/internal_test.go index 41a22a34819..8da8efb486a 100644 --- a/pkg/healthcheck/internal_test.go +++ b/pkg/adminendpoint/internal_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package healthcheck +package adminendpoint import ( "net" @@ -24,39 +24,55 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/jaegertracing/jaeger/pkg/adminendpoint/healthcheck" "github.com/jaegertracing/jaeger/pkg/testutils" ) -func TestHttpCall(t *testing.T) { - hc := New(Unavailable) - handler := hc.httpHandler() +func TestHealthCheckHttpCall(t *testing.T) { + hc := healthcheck.New(healthcheck.Unavailable) + ae := New(HealthCheck(hc)) + handler := ae.httpHandler() server := httptest.NewServer(handler) defer server.Close() - hc.Set(Ready) + hc.Set(healthcheck.Ready) resp, err := http.Get(server.URL + "/") require.NoError(t, err) assert.Equal(t, http.StatusNoContent, resp.StatusCode) } +func TestPprofHttpCall(t *testing.T) { + hc := healthcheck.New(healthcheck.Ready) + ae := New(HealthCheck(hc)) + handler := ae.httpHandler() + + server := httptest.NewServer(handler) + defer server.Close() + + resp, err := http.Get(server.URL + "/debug/pprof/") + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + func TestListenerClose(t *testing.T) { logger, logBuf := testutils.NewLogger() - hc := New(Unavailable, Logger(logger)) + hc := healthcheck.New(healthcheck.Unavailable, healthcheck.Logger(logger)) + ae := New(Logger(logger), HealthCheck(hc)) l, err := net.Listen("tcp", ":0") assert.NoError(t, err) l.Close() - hc.serveWithListener(l) + ae.serveWithListener(l) for i := 0; i < 1000; i++ { - if hc.Get() == Broken { + if hc.Get() == healthcheck.Broken { break } time.Sleep(time.Millisecond) } - assert.Equal(t, Broken, hc.Get()) + assert.Equal(t, healthcheck.Broken, hc.Get()) log := logBuf.JSONLine(0) assert.Equal(t, "failed to serve", log["msg"]) }