Skip to content

Commit

Permalink
Add option to disable auth when running locally.
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie committed Nov 22, 2018
1 parent 631b45d commit da18cb3
Show file tree
Hide file tree
Showing 6 changed files with 389 additions and 266 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ cmd/promtail/promtail
mixin/vendor/
pkg/logproto/logproto.pb.go
pkg/parser/labels.go
tempo
/tempo
287 changes: 22 additions & 265 deletions cmd/tempo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,261 +2,47 @@ package main

import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"

"github.com/go-kit/kit/log/level"
"github.com/opentracing-contrib/go-stdlib/nethttp"
opentracing "github.com/opentracing/opentracing-go"
"github.com/grafana/tempo/pkg/tempo"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"

"github.com/grafana/tempo/pkg/distributor"
"github.com/grafana/tempo/pkg/ingester"
"github.com/grafana/tempo/pkg/ingester/client"
"github.com/grafana/tempo/pkg/logproto"
"github.com/grafana/tempo/pkg/querier"
)

type config struct {
Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
}

func (c *config) RegisterFlags(f *flag.FlagSet) {
c.Server.MetricsNamespace = "tempo"
c.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
middleware.ServerUserHeaderInterceptor,
}
c.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
middleware.StreamServerUserHeaderInterceptor,
}

c.Server.RegisterFlags(f)
c.Distributor.RegisterFlags(f)
c.Querier.RegisterFlags(f)
c.IngesterClient.RegisterFlags(f)
c.Ingester.RegisterFlags(f)
}

type Tempo struct {
server *server.Server
ring *ring.Ring
distributor *distributor.Distributor
ingester *ingester.Ingester
querier *querier.Querier
}

type moduleName int

const (
Ring moduleName = iota
Server
Distributor
Ingester
Querier
All
)

func (m moduleName) String() string {
switch m {
case Ring:
return "ring"
case Server:
return "server"
case Distributor:
return "distributor"
case Ingester:
return "ingester"
case Querier:
return "querier"
case All:
return "all"
default:
panic(fmt.Sprintf("unknow module name: %d", m))
}
}

func (m *moduleName) Set(s string) error {
switch strings.ToLower(s) {
case "ring":
*m = Ring
return nil
case "server":
*m = Server
return nil
case "distributor":
*m = Distributor
return nil
case "ingester":
*m = Ingester
return nil
case "querier":
*m = Querier
return nil
case "all":
*m = All
return nil
default:
return fmt.Errorf("unrecognised module name: %s", s)
}
}

type module struct {
deps []moduleName
init func(t *Tempo, cfg *config) error
stop func(t *Tempo)
}

var modules = map[moduleName]module{
Server: module{
init: func(t *Tempo, cfg *config) (err error) {
t.server, err = server.New(cfg.Server)
return
},
},

Ring: module{
deps: []moduleName{Server},
init: func(t *Tempo, cfg *config) (err error) {
t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig)
if err != nil {
return
}
t.server.HTTP.Handle("/ring", t.ring)
return
},
},

Distributor: module{
deps: []moduleName{Ring, Server},
init: func(t *Tempo, cfg *config) (err error) {
t.distributor, err = distributor.New(cfg.Distributor, cfg.IngesterClient, t.ring)
if err != nil {
return
}

operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string {
return r.URL.RequestURI()
})
t.server.HTTP.Handle("/api/prom/push", middleware.Merge(
middleware.Func(func(handler http.Handler) http.Handler {
return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc)
}),
middleware.AuthenticateUser,
).Wrap(http.HandlerFunc(t.distributor.PushHandler)))

return
},
},

Ingester: module{
deps: []moduleName{Server},
init: func(t *Tempo, cfg *config) (err error) {
cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort
t.ingester, err = ingester.New(cfg.Ingester)
if err != nil {
return
}

logproto.RegisterPusherServer(t.server.GRPC, t.ingester)
logproto.RegisterQuerierServer(t.server.GRPC, t.ingester)
grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester)
t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler))
return
},
stop: func(t *Tempo) {
t.ingester.Shutdown()
},
},

Querier: module{
deps: []moduleName{Ring, Server},
init: func(t *Tempo, cfg *config) (err error) {
t.querier, err = querier.New(cfg.Querier, cfg.IngesterClient, t.ring)
if err != nil {
return
}

operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string {
return r.URL.RequestURI()
})
httpMiddleware := middleware.Merge(
middleware.Func(func(handler http.Handler) http.Handler {
return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc)
}),
middleware.AuthenticateUser,
)
t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.QueryHandler)))
t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
return
},
},

All: module{
deps: []moduleName{Querier, Ingester, Distributor},
init: func(t *Tempo, cfg *config) (err error) {
return
},
},
}

var (
cfg config
tempo Tempo
inited = map[moduleName]struct{}{}
)

func initModule(m moduleName) error {
if _, ok := inited[m]; ok {
return nil
}

for _, dep := range modules[m].deps {
initModule(dep)
}

level.Info(util.Logger).Log("msg", "initialising", "module", m)
if err := modules[m].init(&tempo, &cfg); err != nil {
return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", m))
}
func main() {
var (
cfg tempo.Config
configFile = ""
)
flag.StringVar(&configFile, "config.file", "", "Configuration file to load.")
flagext.RegisterFlags(&cfg)
flag.Parse()

inited[m] = struct{}{}
return nil
}
util.InitLogger(&cfg.Server)

func stopModule(m moduleName) {
if _, ok := inited[m]; !ok {
return
if configFile != "" {
if err := readConfig(configFile, &cfg); err != nil {
level.Error(util.Logger).Log("msg", "error loading config", "filename", configFile, "err", err)
os.Exit(1)
}
}
delete(inited, m)

for _, dep := range modules[m].deps {
stopModule(dep)
t, err := tempo.New(cfg)
if err != nil {
level.Error(util.Logger).Log("msg", "error initialising module", "err", err)
os.Exit(1)
}

if modules[m].stop != nil {
level.Info(util.Logger).Log("msg", "stopping", "module", m)
modules[m].stop(&tempo)
}
t.Run()
t.Stop()
}

func readConfig(filename string) error {
func readConfig(filename string, cfg *tempo.Config) error {
f, err := os.Open(filename)
if err != nil {
return errors.Wrap(err, "error opening config file")
Expand All @@ -273,32 +59,3 @@ func readConfig(filename string) error {
}
return nil
}

func main() {
var (
target = All
configFile = ""
)
flag.Var(&target, "target", "target module (default All)")
flag.StringVar(&configFile, "config.file", "", "Configuration file to load.")
flagext.RegisterFlags(&cfg)
flag.Parse()

util.InitLogger(&cfg.Server)

if configFile != "" {
if err := readConfig(configFile); err != nil {
level.Error(util.Logger).Log("msg", "error loading config", "filename", configFile, "err", err)
os.Exit(1)
}
}

if err := initModule(target); err != nil {
level.Error(util.Logger).Log("msg", "error initialising module", "err", err)
os.Exit(1)
}

tempo.server.Run()
tempo.server.Shutdown()
stopModule(target)
}
2 changes: 2 additions & 0 deletions docs/local.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
auth_enabled: false

ingester:
lifecycler:
ring:
Expand Down
42 changes: 42 additions & 0 deletions pkg/tempo/fake_auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package tempo

import (
"context"
"net/http"

"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
)

// Fake auth middlewares just injects a fake userID, so the rest of the code
// can continue to be multitenant.

var fakeHTTPAuthMiddleware = middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := user.InjectOrgID(r.Context(), "fake")
next.ServeHTTP(w, r.WithContext(ctx))
})
})

var fakeGRPCAuthUniaryMiddleware = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx = user.InjectOrgID(ctx, "fake")
return handler(ctx, req)
}

var fakeGRPCAuthStreamMiddleware = func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := user.InjectOrgID(ss.Context(), "fake")
return handler(srv, serverStream{
ctx: ctx,
ServerStream: ss,
})
}

type serverStream struct {
ctx context.Context
grpc.ServerStream
}

func (ss serverStream) Context() context.Context {
return ss.ctx
}
Loading

0 comments on commit da18cb3

Please sign in to comment.