Skip to content

Commit

Permalink
[agent] Improve graceful shutdown (#2031)
Browse files Browse the repository at this point in the history
* Add support for graceful shutdown of agent.

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

* Removed #Close from noop places

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

* Added #Close to CollectorProxy back

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

* Removed multierr from collector proxy

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

* Restored collector's proxy closing of its reporter

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
  • Loading branch information
jpkrohling and yurishkuro authored Feb 24, 2020
1 parent 00985ca commit d5036f4
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 24 deletions.
19 changes: 13 additions & 6 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
package app

import (
"io"
"context"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/gorilla/mux"
"go.uber.org/zap"
Expand All @@ -33,7 +34,6 @@ type Agent struct {
httpServer *http.Server
httpAddr atomic.Value // string, set once agent starts listening
logger *zap.Logger
closer io.Closer
}

// NewAgent creates the new Agent.
Expand Down Expand Up @@ -65,10 +65,9 @@ func (a *Agent) Run() error {
return err
}
a.httpAddr.Store(listener.Addr().String())
a.closer = listener
go func() {
a.logger.Info("Starting jaeger-agent HTTP server", zap.Int("http-port", listener.Addr().(*net.TCPAddr).Port))
if err := a.httpServer.Serve(listener); err != nil {
if err := a.httpServer.Serve(listener); err != http.ErrServerClosed {
a.logger.Error("http server failure", zap.Error(err))
}
a.logger.Info("agent's http server exiting")
Expand All @@ -86,8 +85,16 @@ func (a *Agent) HTTPAddr() string {

// Stop forces all agent go routines to exit.
func (a *Agent) Stop() {
// first, close the http server, so that we don't have any more inflight requests
a.logger.Info("shutting down agent's HTTP server", zap.String("addr", a.HTTPAddr()))
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := a.httpServer.Shutdown(timeout); err != nil {
a.logger.Error("failed to close HTTP server", zap.Error(err))
}
cancel()

// then, close all processors that are called for the incoming http requests
for _, processor := range a.processors {
go processor.Stop()
processor.Stop()
}
a.closer.Close()
}
2 changes: 2 additions & 0 deletions cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package app

import (
"fmt"
"io"
"net/http"
"strconv"

Expand Down Expand Up @@ -68,6 +69,7 @@ var (
type CollectorProxy interface {
GetReporter() reporter.Reporter
GetManager() configmanager.ClientConfigManager
io.Closer
}

// Builder Struct to hold configurations
Expand Down
3 changes: 3 additions & 0 deletions cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error)
func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) {
return nil
}
func (fakeCollectorProxy) Close() error {
return nil
}

func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
return nil, errors.New("no peers available")
Expand Down
3 changes: 3 additions & 0 deletions cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"io"
"net"
"testing"
"time"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)

var _ io.Closer = (*ProxyBuilder)(nil)

func TestMultipleCollectors(t *testing.T) {
spanHandler1 := &mockSpanHandler{}
s1, addr1 := initializeGRPCTestServer(t, func(s *grpc.Server) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/tchannel/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager {

// Close closes connections used by proxy.
func (b ProxyBuilder) Close() error {
b.tchanRep.Channel().Close()
b.reporter.Close()
b.tchanRep.Channel().Close()
return nil
}
3 changes: 3 additions & 0 deletions cmd/agent/app/reporter/tchannel/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tchannel

import (
"io"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
)

var _ io.Closer = (*ProxyBuilder)(nil)

func TestErrorReporterBuilder(t *testing.T) {
tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{})
b, err := NewCollectorProxy(tbuilder, metrics.NullFactory, zap.NewNop())
Expand Down
6 changes: 2 additions & 4 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main

import (
"fmt"
"io"
"os"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -76,9 +75,8 @@ func main() {
return fmt.Errorf("failed to run the agent: %w", err)
}
svc.RunAndThen(func() {
if closer, ok := cp.(io.Closer); ok {
closer.Close()
}
agent.Stop()
cp.Close()
})
return nil
},
Expand Down
30 changes: 17 additions & 13 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ by default uses only in-memory database.`,
cOpts := new(collectorApp.CollectorOptions).InitFromViper(v)
qOpts := new(queryApp.QueryOptions).InitFromViper(v, logger)

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
ServiceName: "jaeger-collector",
Logger: logger,
Expand All @@ -130,14 +131,25 @@ by default uses only in-memory database.`,
})
c.Start(cOpts)

startAgent(aOpts, repOpts, tchanBuilder, grpcBuilder, cOpts.CollectorGRPCPort, logger, metricsFactory)
// agent
grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort))
agentMetricsFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})
cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, agentMetricsFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
}
agent := startAgent(cp, aOpts, logger, metricsFactory)

// query
querySrv := startQuery(
svc, qOpts, archiveOptions(storageFactory, logger),
spanReader, dependencyReader,
rootMetricsFactory, metricsFactory,
)

svc.RunAndThen(func() {
agent.Stop()
cp.Close()
c.Close()
querySrv.Close()
if closer, ok := spanWriter.(io.Closer); ok {
Expand Down Expand Up @@ -177,21 +189,11 @@ by default uses only in-memory database.`,
}

func startAgent(
cp agentApp.CollectorProxy,
b *agentApp.Builder,
repOpts *agentRep.Options,
tchanBuilder *agentTchanRep.Builder,
grpcBuilder *agentGrpcRep.ConnBuilder,
collectorGRPCPort int,
logger *zap.Logger,
baseFactory metrics.Factory,
) {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})

grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", collectorGRPCPort))
cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, metricsFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
}
) *agentApp.Agent {

agent, err := b.CreateAgent(cp, logger, baseFactory)
if err != nil {
Expand All @@ -202,6 +204,8 @@ func startAgent(
if err := agent.Run(); err != nil {
logger.Fatal("Failed to run the agent", zap.Error(err))
}

return agent
}

func startQuery(
Expand Down

0 comments on commit d5036f4

Please sign in to comment.