Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize CLI flags to use host:port addresses #1827

Merged
merged 21 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,19 @@ Changes by Version

##### Breaking Changes

* The default value for the Ingester's flag `ingester.deadlockInterval` has been changed to `0`. With the new default, the ingester won't `panic` if there are no messages for the last minute. To restore the previous behavior, set the flag's value to `1m`.
* Changed the default value for the flag `ingester.deadlockInterval` ([1868](https://github.com/jaegertracing/jaeger/pull/1868), [@jpkrohling](https://github.com/jpkrohling))

The default value for the Ingester's flag `ingester.deadlockInterval` has been changed to `0`. With the new default, the ingester won't `panic` if there are no messages for the last minute. To restore the previous behavior, set the flag's value to `1m`.

* Normalize CLI flags to use host:port addresses ([#1827](https://github.com/jaegertracing/jaeger/pull/1827), [@annanay25](https://github.com/annanay25))
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

Jaeger now uses standard host:port addresses as CLI flags. Flags previous accepting listen addresses in any other format have now been deprecated.
Deprecated flags and replacements -
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

* `collector.port` - superseded by `collector.tchan-server.host-port`
* `collector.http-port` - superseded by `collector.http-server.host-port`
* `collector.grpc-port` - superseded by `collector.grpc-server.host-port`
* `collector.zipkin.http-port` - superseded by `collector.zipkin.host-port`

##### New Features

Expand Down
47 changes: 30 additions & 17 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net"
"net/http"
"os"
"strconv"

"github.com/gorilla/mux"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -185,7 +184,8 @@ func startAgent(
) {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})

grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort))
addr := getAddressFromCLIOptions(cOpts.CollectorGRPCPort, cOpts.CollectorGRPCHostPort, "collector.grpc-port", logger)
grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, addr)
cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, metricsFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
Expand Down Expand Up @@ -234,16 +234,18 @@ func startCollector(
server.Register(jc.NewTChanCollectorServer(batchHandler))
server.Register(zc.NewTChanZipkinCollectorServer(batchHandler))
server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore)))
portStr := ":" + strconv.Itoa(cOpts.CollectorPort)
listener, err := net.Listen("tcp", portStr)

tchanAddr := getAddressFromCLIOptions(cOpts.CollectorPort, cOpts.CollectorTChanHostPort, "collector.http-port", logger)
listener, err := net.Listen("tcp", tchanAddr)
if err != nil {
logger.Fatal("Unable to start listening on channel", zap.Error(err))
}
logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", cOpts.CollectorPort))
logger.Info("Starting jaeger-collector TChannel server", zap.String("addr", tchanAddr))
ch.Serve(listener)
}

server, err := startGRPCServer(cOpts.CollectorGRPCPort, grpcHandler, strategyStore, logger)
grpcAddr := getAddressFromCLIOptions(cOpts.CollectorGRPCPort, cOpts.CollectorGRPCHostPort, "collector.grpc-port", logger)
server, err := startGRPCServer(grpcAddr, grpcHandler, strategyStore, logger)
if err != nil {
logger.Fatal("Could not start gRPC collector", zap.Error(err))
}
Expand All @@ -252,14 +254,16 @@ func startCollector(
r := mux.NewRouter()
apiHandler := collectorApp.NewAPIHandler(jaegerBatchesHandler)
apiHandler.RegisterRoutes(r)
httpPortStr := ":" + strconv.Itoa(cOpts.CollectorHTTPPort)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

go startZipkinHTTPAPI(logger, cOpts.CollectorZipkinHTTPPort, zipkinSpansHandler, recoveryHandler)
zipkinAddr := getAddressFromCLIOptions(cOpts.CollectorZipkinHTTPPort, cOpts.CollectorZipkinHTTPHostPort, "collector.zipkin.http-port", logger)

go startZipkinHTTPAPI(logger, zipkinAddr, zipkinSpansHandler, recoveryHandler)

logger.Info("Starting jaeger-collector HTTP server", zap.Int("http-port", cOpts.CollectorHTTPPort))
httpAddr := getAddressFromCLIOptions(cOpts.CollectorHTTPPort, cOpts.CollectorHTTPHostPort, "collector.http-port", logger)
logger.Info("Starting jaeger-collector HTTP server", zap.String("http-addr", httpAddr))
go func() {
if err := http.ListenAndServe(httpPortStr, recoveryHandler(r)); err != nil {
if err := http.ListenAndServe(httpAddr, recoveryHandler(r)); err != nil {
logger.Fatal("Could not launch jaeger-collector HTTP server", zap.Error(err))
}
hc.Set(healthcheck.Unavailable)
Expand All @@ -269,13 +273,13 @@ func startCollector(
}

func startGRPCServer(
port int,
addr string,
handler *collectorApp.GRPCHandler,
samplingStore strategystore.StrategyStore,
logger *zap.Logger,
) (*grpc.Server, error) {
server := grpc.NewServer()
_, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) {
_, err := grpcserver.StartGRPCCollector(addr, server, handler, samplingStore, logger, func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
if err != nil {
Expand All @@ -286,18 +290,17 @@ func startGRPCServer(

func startZipkinHTTPAPI(
logger *zap.Logger,
zipkinPort int,
zipkinAddr string,
zipkinSpansHandler collectorApp.ZipkinSpansHandler,
recoveryHandler func(http.Handler) http.Handler,
) {
if zipkinPort != 0 {
if zipkinAddr != "" {
r := mux.NewRouter()
zHandler := zipkin.NewAPIHandler(zipkinSpansHandler)
zHandler.RegisterRoutes(r)
httpPortStr := ":" + strconv.Itoa(zipkinPort)
logger.Info("Listening for Zipkin HTTP traffic", zap.Int("zipkin.http-port", zipkinPort))
logger.Info("Listening for Zipkin HTTP traffic", zap.String("zipkin.http-addr", zipkinAddr))

if err := http.ListenAndServe(httpPortStr, recoveryHandler(r)); err != nil {
if err := http.ListenAndServe(zipkinAddr, recoveryHandler(r)); err != nil {
logger.Fatal("Could not launch service", zap.Error(err))
}
}
Expand Down Expand Up @@ -362,3 +365,13 @@ func initTracer(metricsFactory metrics.Factory, logger *zap.Logger) io.Closer {
opentracing.SetGlobalTracer(tracer)
return closer
}

// Utility function to decide listening address based on port (deprecated flags) or host:port (new flags)
func getAddressFromCLIOptions(port int, addr string, deprecatedFlag string, logger *zap.Logger) string {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
if port != 0 {
logger.Warn("Using deprecated configuration", zap.String("option", deprecatedFlag))
return ports.PortToHostPort(port)
}

return addr
}
32 changes: 26 additions & 6 deletions cmd/collector/app/builder/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ const (
collectorPort = "collector.port"
collectorHTTPPort = "collector.http-port"
collectorGRPCPort = "collector.grpc-port"
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
collectorZipkinHTTPort = "collector.zipkin.http-port"
collectorTChanHostPort = "collector.tchan-server.host-port"
collectorHTTPHostPort = "collector.http-server.host-port"
collectorGRPCHostPort = "collector.grpc-server.host-port"
collectorZipkinHTTPPort = "collector.zipkin.http-port"
collectorZipkinHTTPHostPort = "collector.zipkin.host-port"
collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins"
collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
)
Expand All @@ -54,10 +58,18 @@ type CollectorOptions struct {
CollectorHTTPPort int
// CollectorGRPCPort is the port that the collector service listens in on for gRPC requests
CollectorGRPCPort int
// CollectorTChanHostPort is the host:port address that the collector service listens in on for tchannel requests
CollectorTChanHostPort string
// CollectorHTTPHostPort is the host:port address that the collector service listens in on for http requests
CollectorHTTPHostPort string
// CollectorGRPCHostPort is the host:port address that the collector service listens in on for gRPC requests
CollectorGRPCHostPort string
// TLS configures secure transport
TLS tlscfg.Options
// CollectorZipkinHTTPPort is the port that the Zipkin collector service listens in on for http requests
CollectorZipkinHTTPPort int
// CollectorZipkinHTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
CollectorZipkinHTTPHostPort string
// CollectorZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from
CollectorZipkinAllowedOrigins string
// CollectorZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests
Expand All @@ -68,10 +80,14 @@ type CollectorOptions struct {
func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorQueueSize, app.DefaultQueueSize, "The queue size of the collector")
flags.Int(collectorNumWorkers, app.DefaultNumWorkers, "The number of workers pulling items from the queue")
flags.Int(collectorPort, ports.CollectorTChannel, "The TChannel port for the collector service")
flags.Int(collectorHTTPPort, ports.CollectorHTTP, "The HTTP port for the collector service")
flags.Int(collectorGRPCPort, ports.CollectorGRPC, "The gRPC port for the collector service")
flags.Int(collectorZipkinHTTPort, 0, "The HTTP port for the Zipkin collector service e.g. 9411")
flags.Int(collectorPort, 0, "(deprecated) please use - "+collectorTChanHostPort)
flags.Int(collectorHTTPPort, 0, "(deprecated) please use -"+collectorHTTPHostPort)
flags.Int(collectorGRPCPort, 0, "(deprecated) please use -"+collectorGRPCHostPort)
flags.Int(collectorZipkinHTTPPort, 0, "(deprecated) please use -"+collectorZipkinHTTPHostPort)
flags.String(collectorTChanHostPort, ports.PortToHostPort(ports.CollectorTChannel), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's TChannel server")
flags.String(collectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's HTTP server")
flags.String(collectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's GRPC server")
flags.String(collectorZipkinHTTPHostPort, ports.PortToHostPort(0), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's Zipkin server")
flags.String(collectorZipkinAllowedOrigins, "*", "Comma separated list of allowed origins for the Zipkin collector service, default accepts all")
flags.String(collectorZipkinAllowedHeaders, "content-type", "Comma separated list of allowed headers for the Zipkin collector service, default content-type")
tlsFlagsConfig.AddFlags(flags)
Expand All @@ -84,7 +100,11 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.CollectorPort = v.GetInt(collectorPort)
cOpts.CollectorHTTPPort = v.GetInt(collectorHTTPPort)
cOpts.CollectorGRPCPort = v.GetInt(collectorGRPCPort)
cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPort)
cOpts.CollectorTChanHostPort = v.GetString(collectorTChanHostPort)
cOpts.CollectorHTTPHostPort = v.GetString(collectorHTTPHostPort)
cOpts.CollectorGRPCHostPort = v.GetString(collectorGRPCHostPort)
cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPPort)
cOpts.CollectorZipkinHTTPHostPort = v.GetString(collectorZipkinHTTPHostPort)
cOpts.CollectorZipkinAllowedOrigins = v.GetString(collectorZipkinAllowedOrigins)
cOpts.CollectorZipkinAllowedHeaders = v.GetString(collectorZipkinAllowedHeaders)
cOpts.TLS = tlsFlagsConfig.InitFromViper(v)
Expand Down
5 changes: 2 additions & 3 deletions cmd/collector/app/grpcserver/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ import (

// StartGRPCCollector configures and starts gRPC endpoints exposed by collector.
func StartGRPCCollector(
port int,
addr string,
server *grpc.Server,
handler *app.GRPCHandler,
samplingStrategy strategystore.StrategyStore,
logger *zap.Logger,
serveErr func(error),
) (net.Addr, error) {
grpcPortStr := ":" + strconv.Itoa(port)
lis, err := net.Listen("tcp", grpcPortStr)
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, errors.Wrap(err, "failed to listen on gRPC port")
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/collector/app/grpcserver/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)
Expand All @@ -38,8 +39,8 @@ func TestFailToListen(t *testing.T) {
l, _ := zap.NewDevelopment()
handler := app.NewGRPCHandler(l, &mockSpanProcessor{})
server := grpc.NewServer()
const invalidPort = -1
addr, err := StartGRPCCollector(invalidPort, server, handler, &mockSamplingStore{}, l, func(e error) {
const invalidAddr = ":-1"
addr, err := StartGRPCCollector(invalidAddr, server, handler, &mockSamplingStore{}, l, func(e error) {
})
assert.Nil(t, addr)
assert.EqualError(t, err, "failed to listen on gRPC port: listen tcp: address -1: invalid port")
Expand All @@ -63,7 +64,7 @@ func TestSpanCollector(t *testing.T) {
l, _ := zap.NewDevelopment()
handler := app.NewGRPCHandler(l, &mockSpanProcessor{})
server := grpc.NewServer()
addr, err := StartGRPCCollector(0, server, handler, &mockSamplingStore{}, l, func(e error) {
addr, err := StartGRPCCollector(ports.PortToHostPort(0), server, handler, &mockSamplingStore{}, l, func(e error) {
})
require.NoError(t, err)

Expand Down
41 changes: 27 additions & 14 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net"
"net/http"
"os"
"strconv"
"strings"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -120,12 +119,13 @@ func main() {
server.Register(jc.NewTChanCollectorServer(batchHandler))
server.Register(zc.NewTChanZipkinCollectorServer(batchHandler))
server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore)))
portStr := ":" + strconv.Itoa(builderOpts.CollectorPort)
listener, err := net.Listen("tcp", portStr)

addr := getAddressFromCLIOptions(builderOpts.CollectorPort, builderOpts.CollectorTChanHostPort, "collector.http-port", logger)
listener, err := net.Listen("tcp", addr)
if err != nil {
logger.Fatal("Unable to start listening on channel", zap.Error(err))
}
logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", builderOpts.CollectorPort))
logger.Info("Starting jaeger-collector TChannel server", zap.String("tchan-addr", addr))
ch.Serve(listener)
}

Expand All @@ -138,15 +138,17 @@ func main() {
r := mux.NewRouter()
apiHandler := app.NewAPIHandler(jaegerBatchesHandler)
apiHandler.RegisterRoutes(r)
httpPortStr := ":" + strconv.Itoa(builderOpts.CollectorHTTPPort)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)
httpHandler := recoveryHandler(r)

go startZipkinHTTPAPI(logger, builderOpts.CollectorZipkinHTTPPort, builderOpts.CollectorZipkinAllowedOrigins, builderOpts.CollectorZipkinAllowedHeaders, zipkinSpansHandler, recoveryHandler)
zipkinAddr := getAddressFromCLIOptions(builderOpts.CollectorZipkinHTTPPort, builderOpts.CollectorZipkinHTTPHostPort, "collector.zipkin.http-port", logger)

go startZipkinHTTPAPI(logger, zipkinAddr, builderOpts.CollectorZipkinAllowedOrigins, builderOpts.CollectorZipkinAllowedHeaders, zipkinSpansHandler, recoveryHandler)

logger.Info("Starting jaeger-collector HTTP server", zap.Int("http-port", builderOpts.CollectorHTTPPort))
httpAddr := getAddressFromCLIOptions(builderOpts.CollectorHTTPPort, builderOpts.CollectorHTTPHostPort, "collector.http-port", logger)
logger.Info("Starting jaeger-collector HTTP server", zap.String("http-addr", httpAddr))
go func() {
if err := http.ListenAndServe(httpPortStr, httpHandler); err != nil {
if err := http.ListenAndServe(httpAddr, httpHandler); err != nil {
logger.Fatal("Could not launch service", zap.Error(err))
}
svc.HC().Set(healthcheck.Unavailable)
Expand Down Expand Up @@ -204,7 +206,9 @@ func startGRPCServer(
} else { // server without TLS
server = grpc.NewServer()
}
_, err := grpcserver.StartGRPCCollector(opts.CollectorGRPCPort, server, handler, samplingStore, logger, func(err error) {

addr := getAddressFromCLIOptions(opts.CollectorGRPCPort, opts.CollectorGRPCHostPort, "collector.grpc-port", logger)
_, err := grpcserver.StartGRPCCollector(addr, server, handler, samplingStore, logger, func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
if err != nil {
Expand All @@ -215,13 +219,13 @@ func startGRPCServer(

func startZipkinHTTPAPI(
logger *zap.Logger,
zipkinPort int,
zipkinAddr string,
allowedOrigins string,
allowedHeaders string,
zipkinSpansHandler app.ZipkinSpansHandler,
recoveryHandler func(http.Handler) http.Handler,
) {
if zipkinPort != 0 {
if zipkinAddr != "" {
zHandler := zipkin.NewAPIHandler(zipkinSpansHandler)
r := mux.NewRouter()
zHandler.RegisterRoutes(r)
Expand All @@ -235,10 +239,9 @@ func startZipkinHTTPAPI(
AllowedHeaders: headers,
})

httpPortStr := ":" + strconv.Itoa(zipkinPort)
logger.Info("Listening for Zipkin HTTP traffic", zap.Int("zipkin.http-port", zipkinPort))
logger.Info("Listening for Zipkin HTTP traffic", zap.String("zipkin.http-addr", zipkinAddr))

if err := http.ListenAndServe(httpPortStr, c.Handler(recoveryHandler(r))); err != nil {
if err := http.ListenAndServe(zipkinAddr, c.Handler(recoveryHandler(r))); err != nil {
logger.Fatal("Could not launch service", zap.Error(err))
}
}
Expand All @@ -258,3 +261,13 @@ func initSamplingStrategyStore(
}
return strategyStore
}

// Utility function to decide listening address based on port (deprecated flags) or host:port (new flags)
func getAddressFromCLIOptions(port int, addr string, deprecatedFlag string, logger *zap.Logger) string {
if port != 0 {
logger.Warn("Using deprecated configuration", zap.String("option", deprecatedFlag))
return ports.PortToHostPort(port)
}

return addr
}
Loading