Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Ports for GRPC and HTTP requests in Query Server #2387

Merged
merged 13 commits into from
Sep 11, 2020
Merged
36 changes: 32 additions & 4 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
queryHostPort = "query.host-port"
queryPort = "query.port"
queryPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)"
queryHOSTPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)"
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
queryHTTPHostPort = "query.http-server.host-port"
queryGRPCHostPort = "query.grpc-server.host-port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
Expand All @@ -56,8 +59,12 @@ var tlsFlagsConfig = tlscfg.ServerFlagsConfig{

// QueryOptions holds configuration for query service
type QueryOptions struct {
// HostPort is the host:port address that the query service listens o n
// HostPort is the host:port address that the query service listens on
HostPort string
// HTTPHostPort is the host:port address that the query service listens in on for http requests
HTTPHostPort string
// GRPCHostPort is the host:port address that the query service listens in on for gRPC requests
GRPCHostPort string
// BasePath is the prefix for all UI and API HTTP routes
BasePath string
// StaticAssets is the path for the static assets for the UI (https://github.com/uber/jaeger-ui)
Expand All @@ -77,7 +84,9 @@ type QueryOptions struct {
// AddFlags adds flags for QueryOptions
func AddFlags(flagSet *flag.FlagSet) {
flagSet.Var(&config.StringSlice{}, queryAdditionalHeaders, `Additional HTTP response headers. Can be specified multiple times. Format: "Key: Value"`)
flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server")
flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), queryHOSTPortWarning+" see --"+queryHTTPHostPort+" and --"+queryGRPCHostPort)
flagSet.String(queryHTTPHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server")
flagSet.String(queryGRPCHostPort, ports.PortToHostPort(ports.QueryGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's gRPC server")
flagSet.Int(queryPort, 0, queryPortWarning+" see --"+queryHostPort)
flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy")
flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI")
Expand All @@ -86,14 +95,33 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(queryMaxClockSkewAdjust, time.Second, "The maximum delta by which span timestamps may be adjusted in the UI due to clock skew; set to 0s to disable clock skew adjustments")
}

// InitPortsConfigFromViper initializes the port numbers and TLS configuration of ports
func (qOpts *QueryOptions) InitPortsConfigFromViper(v *viper.Viper, logger *zap.Logger) *QueryOptions {
qOpts.HTTPHostPort = v.GetString(queryHTTPHostPort)
qOpts.GRPCHostPort = v.GetString(queryGRPCHostPort)
qOpts.HostPort = ports.GetAddressFromCLIOptions(v.GetInt(queryPort), v.GetString(queryHostPort))

qOpts.TLS = tlsFlagsConfig.InitFromViper(v)

// query.host-port is not defined and atleast one of query.grpc-server.host-port or query.http-server.host-port is defined
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
// user intends to use the separate flags.
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
if !(v.IsSet(queryHostPort) || v.IsSet(queryPort)) && (v.IsSet(queryHTTPHostPort) || v.IsSet(queryGRPCHostPort)) {
return qOpts
}
logger.Warn(fmt.Sprintf("Use of %s and %s is deprecated. use %s and %s instead", queryPort, queryHostPort, queryHTTPHostPort, queryGRPCHostPort))
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
qOpts.HTTPHostPort = qOpts.HostPort
qOpts.GRPCHostPort = qOpts.HostPort
return qOpts

}

// InitFromViper initializes QueryOptions with properties from viper
func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) *QueryOptions {
qOpts.HostPort = ports.GetAddressFromCLIOptions(v.GetInt(queryPort), v.GetString(queryHostPort))
qOpts = qOpts.InitPortsConfigFromViper(v, logger)
qOpts.BasePath = v.GetString(queryBasePath)
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)
qOpts.TLS = tlsFlagsConfig.InitFromViper(v)
qOpts.MaxClockSkewAdjust = v.GetDuration(queryMaxClockSkewAdjust)

stringSlice := v.GetStringSlice(queryAdditionalHeaders)
Expand Down
21 changes: 21 additions & 0 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/mocks"
spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)
Expand Down Expand Up @@ -60,6 +61,26 @@ func TestQueryBuilderFlags(t *testing.T) {
assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust)
}

func TestQueryBuilderFlagsSeparatePorts(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
"--query.http-server.host-port=127.0.0.1:8080",
})
qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop())
assert.Equal(t, "127.0.0.1:8080", qOpts.HTTPHostPort)
assert.Equal(t, ports.PortToHostPort(ports.QueryGRPC), qOpts.GRPCHostPort)
}

func TestQueryBuilderFlagsSeparateNoPorts(t *testing.T) {
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{})
qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop())

assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HTTPHostPort)
assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.GRPCHostPort)
assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HostPort)
}

func TestQueryBuilderBadHeadersFlags(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
Expand Down
107 changes: 88 additions & 19 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/netutils"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand All @@ -42,13 +43,27 @@ type Server struct {
tracer opentracing.Tracer // TODO make part of flags.Service

conn net.Listener
grpcConn net.Listener
httpConn net.Listener
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
unavailableChannel chan healthcheck.Status
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {

httpPort, err := ports.HostPortToPort(options.HTTPHostPort)
if err != nil {
return nil, err
}
grpcPort, err := ports.HostPortToPort(options.GRPCHostPort)
if err != nil {
return nil, err
}
separatePorts := (grpcPort != httpPort)
rjs211 marked this conversation as resolved.
Show resolved Hide resolved

grpcServer, err := createGRPCServer(querySvc, options, logger, tracer)
if err != nil {
return nil, err
Expand All @@ -61,6 +76,7 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *Que
tracer: tracer,
grpcServer: grpcServer,
httpServer: createHTTPServer(querySvc, options, tracer, logger),
separatePorts: separatePorts,
unavailableChannel: make(chan healthcheck.Status),
}, nil
}
Expand Down Expand Up @@ -117,11 +133,27 @@ func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions,
}
}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start() error {
// initListener initialises listeners of the server
func (s *Server) initListener() (cmux.CMux, error) {
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
var err error
s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPCHostPort)
if err != nil {
return nil, err
}

s.httpConn, err = net.Listen("tcp", s.queryOptions.HTTPHostPort)
if err != nil {
return nil, err
}
s.logger.Info("Query server started")
return nil, nil
}

// old behavior using cmux
conn, err := net.Listen("tcp", s.queryOptions.HostPort)
if err != nil {
return err
return nil, err
}
s.conn = conn

Expand All @@ -138,16 +170,46 @@ func (s *Server) Start() error {
// cmux server acts as a reverse-proxy between HTTP and GRPC backends.
cmuxServer := cmux.New(s.conn)

grpcListener := cmuxServer.MatchWithWriters(
s.grpcConn = cmuxServer.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
)
httpListener := cmuxServer.Match(cmux.Any())
s.httpConn = cmuxServer.Match(cmux.Any())
s.queryOptions.HTTPHostPort = s.queryOptions.HostPort
s.queryOptions.GRPCHostPort = s.queryOptions.HostPort

return cmuxServer, nil

}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start() error {
cmuxServer, err := s.initListener()
if err != nil {
return err
}

var tcpPort int
if !s.separatePorts {
if port, err := netutils.GetPort(s.conn.Addr()); err == nil {
tcpPort = port
}

rjs211 marked this conversation as resolved.
Show resolved Hide resolved
}
var httpPort int
if port, err := netutils.GetPort(s.httpConn.Addr()); err == nil {
httpPort = port
}

var grpcPort int
if port, err := netutils.GetPort(s.grpcConn.Addr()); err == nil {
grpcPort = port
}

go func() {
s.logger.Info("Starting HTTP server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))
s.logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort))

switch err := s.httpServer.Serve(httpListener); err {
switch err := s.httpServer.Serve(s.httpConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
// normal exit, nothing to do
default:
Expand All @@ -158,25 +220,27 @@ func (s *Server) Start() error {

// Start GRPC server concurrently
go func() {
s.logger.Info("Starting GRPC server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))
s.logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort))

if err := s.grpcServer.Serve(grpcListener); err != nil {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("Could not start GRPC server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()

// Start cmux server concurrently.
go func() {
s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))
if !s.separatePorts {
go func() {
s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))

err := cmuxServer.Serve()
// TODO: Remove string comparison when https://github.com/soheilhy/cmux/pull/69 is merged
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()
err := cmuxServer.Serve()
// TODO: Remove string comparison when https://github.com/soheilhy/cmux/pull/69 is merged
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()
}

return nil
}
Expand All @@ -186,6 +250,11 @@ func (s *Server) Close() error {
s.queryOptions.TLS.Close()
s.grpcServer.Stop()
s.httpServer.Close()
s.conn.Close()
if s.separatePorts {
s.httpConn.Close()
s.grpcConn.Close()
} else {
s.conn.Close()
}
return nil
}
50 changes: 46 additions & 4 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func TestServer(t *testing.T) {
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()
hostPort := ports.GetAddressFromCLIOptions(ports.QueryHTTP, "")

spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
Expand All @@ -70,7 +69,7 @@ func TestServer(t *testing.T) {
querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})

server, err := NewServer(flagsSvc.Logger, querySvc,
&QueryOptions{HostPort: hostPort, BearerTokenPropagation: true},
&QueryOptions{HostPort: hostPort, GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true},
opentracing.NoopTracer{})
assert.Nil(t, err)
assert.NoError(t, server.Start())
Expand Down Expand Up @@ -100,17 +99,60 @@ func TestServer(t *testing.T) {
assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get())
}

func TestServerWithDedicatedPorts(t *testing.T) {
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()

spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})

server, err := NewServer(flagsSvc.Logger, querySvc,
&QueryOptions{HTTPHostPort: "127.0.0.1:8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true},
opentracing.NoopTracer{})
assert.Nil(t, err)
assert.NoError(t, server.Start())
go func() {
for s := range server.HealthCheckStatus() {
flagsSvc.SetHealthCheckStatus(s)
}
}()

client := newGRPCClient(t, "127.0.0.1:8081")
defer client.conn.Close()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
assert.NoError(t, err)
assert.Equal(t, expectedServices, res.Services)

server.Close()
for i := 0; i < 10; i++ {
if flagsSvc.HC().Get() == healthcheck.Unavailable {
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
break
}
time.Sleep(1 * time.Millisecond)
}
assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get())
}

func TestServerGracefulExit(t *testing.T) {
flagsSvc := flags.NewService(ports.QueryAdminHTTP)

zapCore, logs := observer.New(zap.ErrorLevel)
assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.")

flagsSvc.Logger = zap.New(zapCore)
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)

querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ports.PortToHostPort(ports.QueryAdminHTTP)}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: hostPort, GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
go func() {
Expand All @@ -137,7 +179,7 @@ func TestServerHandlesPortZero(t *testing.T) {

querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ":0"}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{HostPort: ":0", GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
server.Close()
Expand Down
10 changes: 10 additions & 0 deletions ports/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
// CollectorAdminHTTP is the default admin HTTP port (health check, metrics, etc.)
CollectorAdminHTTP = 14269

// QueryGRPC is the default port of GRPC requests for Query trace retrieval
QueryGRPC = 16685
// QueryHTTP is the default port for UI and Query API (e.g. /api/* endpoints)
QueryHTTP = 16686
// QueryAdminHTTP is the default admin HTTP port (health check, metrics, etc.)
Expand All @@ -52,6 +54,14 @@ func PortToHostPort(port int) string {
return ":" + strconv.Itoa(port)
}

// HostPortToPort converts the host:port address string intoto port int
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
func HostPortToPort(hostPort string) (int, error) {
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
s := strings.Split(hostPort, ":")
rjs211 marked this conversation as resolved.
Show resolved Hide resolved

return strconv.Atoi(s[len(s)-1])

}

// GetAddressFromCLIOptions gets listening address based on port (deprecated flags) or host:port (new flags)
func GetAddressFromCLIOptions(port int, hostPort string) string {
if port != 0 {
Expand Down