Skip to content

Commit e7e55f8

Browse files
authored
beater: introduce NewCreator and RunServer(Func) (elastic#3668) (elastic#3676)
* beater: introduce NewCreator and RunServer(Func) Introduce the `NewCreator` function, which returns a `beat.Creator`. The behaviour of the returned `beat.Creator` is parameterised by the new `CreatorParams` struct that is passed into `NewCreator`. For now `CreatorParams` contains a single `RunServer` field, which is a function for running the APM Server code (HTTP/gRPC servers). The server code is exposed through a new top-level function, `RunServer`, which should be passed into `CreatorParams`, ether directly or wrapped by another function. These changes enable customising the server to run additional goroutines (e.g. periodic metrics publication), and wrap the core publisher (e.g. aggregate transaction metrics). * Use sync.Once in RumConfig.MemoizedSourcemapStore Fixes a race condition due to the config now being used in multiple goroutines. * Workaround for SourcemapMapping copying issue * beater: ignore http.ErrServerClosed in traceServer * beater: remove invalid comment
1 parent 56efdfd commit e7e55f8

File tree

15 files changed

+393
-385
lines changed

15 files changed

+393
-385
lines changed

beater/beater.go

Lines changed: 115 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,17 @@
1818
package beater
1919

2020
import (
21-
"errors"
22-
"net"
23-
"net/url"
21+
"context"
2422
"sync"
2523

26-
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
27-
24+
"github.com/pkg/errors"
2825
"go.elastic.co/apm"
26+
"golang.org/x/sync/errgroup"
2927

3028
"github.com/elastic/beats/v7/libbeat/beat"
3129
"github.com/elastic/beats/v7/libbeat/cfgfile"
3230
"github.com/elastic/beats/v7/libbeat/common"
31+
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
3332
"github.com/elastic/beats/v7/libbeat/logp"
3433
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
3534

@@ -43,19 +42,62 @@ func init() {
4342
apm.DefaultTracer.Close()
4443
}
4544

46-
type beater struct {
47-
config *config.Config
48-
mutex sync.Mutex // guards server and stopped
49-
server server
50-
stopping chan struct{}
51-
stopped bool
52-
logger *logp.Logger
53-
}
54-
5545
var (
5646
errSetupDashboardRemoved = errors.New("setting 'setup.dashboards' has been removed")
5747
)
5848

49+
// CreatorParams holds parameters for creating beat.Beaters.
50+
type CreatorParams struct {
51+
// RunServer is used to run the APM Server.
52+
//
53+
// This should be set to beater.RunServer, or a function which wraps it.
54+
RunServer RunServerFunc
55+
}
56+
57+
// NewCreator returns a new beat.Creator which creates beaters
58+
// using the provided CreatorParams.
59+
func NewCreator(args CreatorParams) beat.Creator {
60+
if args.RunServer == nil {
61+
panic("args.RunServer must be non-nil")
62+
}
63+
return func(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) {
64+
logger := logp.NewLogger(logs.Beater)
65+
if err := checkConfig(logger); err != nil {
66+
return nil, err
67+
}
68+
var esOutputCfg *common.Config
69+
if isElasticsearchOutput(b) {
70+
esOutputCfg = b.Config.Output.Config()
71+
}
72+
73+
beaterConfig, err := config.NewConfig(b.Info.Version, ucfg, esOutputCfg)
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
bt := &beater{
79+
config: beaterConfig,
80+
stopped: false,
81+
logger: logger,
82+
runServer: args.RunServer,
83+
}
84+
85+
// setup pipelines if explicitly directed to or setup --pipelines and config is not set at all
86+
shouldSetupPipelines := beaterConfig.Register.Ingest.Pipeline.IsEnabled() ||
87+
(b.InSetupCmd && beaterConfig.Register.Ingest.Pipeline.Enabled == nil)
88+
if isElasticsearchOutput(b) && shouldSetupPipelines {
89+
logger.Info("Registering pipeline callback")
90+
err := bt.registerPipelineCallback(b)
91+
if err != nil {
92+
return nil, err
93+
}
94+
} else {
95+
logger.Info("No pipeline callback registered")
96+
}
97+
return bt, nil
98+
}
99+
}
100+
59101
// checkConfig verifies the global configuration doesn't use unsupported settings
60102
func checkConfig(logger *logp.Logger) error {
61103
cfg, err := cfgfile.Load("", nil)
@@ -80,123 +122,87 @@ func checkConfig(logger *logp.Logger) error {
80122
return nil
81123
}
82124

83-
// New creates a beater instance using the provided configuration
84-
func New(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) {
85-
logger := logp.NewLogger(logs.Beater)
86-
if err := checkConfig(logger); err != nil {
87-
return nil, err
88-
}
89-
var esOutputCfg *common.Config
90-
if isElasticsearchOutput(b) {
91-
esOutputCfg = b.Config.Output.Config()
92-
}
93-
94-
beaterConfig, err := config.NewConfig(b.Info.Version, ucfg, esOutputCfg)
95-
if err != nil {
96-
return nil, err
97-
}
98-
99-
bt := &beater{
100-
config: beaterConfig,
101-
stopping: make(chan struct{}),
102-
stopped: false,
103-
logger: logger,
104-
}
105-
106-
// setup pipelines if explicitly directed to or setup --pipelines and config is not set at all
107-
shouldSetupPipelines := beaterConfig.Register.Ingest.Pipeline.IsEnabled() ||
108-
(b.InSetupCmd && beaterConfig.Register.Ingest.Pipeline.Enabled == nil)
109-
if isElasticsearchOutput(b) && shouldSetupPipelines {
110-
logger.Info("Registering pipeline callback")
111-
err := bt.registerPipelineCallback(b)
112-
if err != nil {
113-
return nil, err
114-
}
115-
} else {
116-
logger.Info("No pipeline callback registered")
117-
}
118-
return bt, nil
119-
}
120-
121-
// parseListener extracts the network and path for a configured host address
122-
// all paths are tcp unix:/path/to.sock
123-
func parseListener(host string) (string, string) {
124-
if parsed, err := url.Parse(host); err == nil && parsed.Scheme == "unix" {
125-
return parsed.Scheme, parsed.Path
126-
}
127-
return "tcp", host
128-
}
125+
type beater struct {
126+
config *config.Config
127+
logger *logp.Logger
128+
runServer RunServerFunc
129129

130-
// listen starts the listener for bt.config.Host
131-
// bt.config.Host may be mutated by this function in case the resolved listening address does not match the
132-
// configured bt.config.Host value.
133-
// This should only be called once, from Run.
134-
func (bt *beater) listen() (net.Listener, error) {
135-
network, path := parseListener(bt.config.Host)
136-
if network == "tcp" {
137-
if _, _, err := net.SplitHostPort(path); err != nil {
138-
// tack on a port if SplitHostPort fails on what should be a tcp network address
139-
// if there were already too many colons, one more won't hurt
140-
path = net.JoinHostPort(path, config.DefaultPort)
141-
}
142-
}
143-
lis, err := net.Listen(network, path)
144-
if err != nil {
145-
return nil, err
146-
}
147-
// in case host is :0 or similar
148-
if network == "tcp" {
149-
addr := lis.Addr().(*net.TCPAddr).String()
150-
if bt.config.Host != addr {
151-
bt.logger.Infof("host resolved from %s to %s", bt.config.Host, addr)
152-
bt.config.Host = addr
153-
}
154-
}
155-
return lis, err
130+
mutex sync.Mutex // guards stopServer and stopped
131+
stopServer func()
132+
stopped bool
156133
}
157134

135+
// Run runs the APM Server, blocking until the beater's Stop method is called,
136+
// or a fatal error occurs.
158137
func (bt *beater) Run(b *beat.Beat) error {
159138
tracer, tracerServer, err := initTracer(b.Info, bt.config, bt.logger)
160139
if err != nil {
161140
return err
162141
}
142+
defer tracer.Close()
143+
144+
runServer := bt.runServer
163145
if tracerServer != nil {
164-
go func() {
165-
defer tracerServer.stop()
166-
<-bt.stopping
167-
}()
146+
// Self-instrumentation enabled, so running the APM Server
147+
// should run an internal server for receiving trace data.
148+
origRunServer := runServer
149+
runServer = func(ctx context.Context, args ServerParams) error {
150+
g, ctx := errgroup.WithContext(ctx)
151+
g.Go(func() error {
152+
defer tracerServer.stop()
153+
<-ctx.Done()
154+
// Close the tracer now to prevent the server
155+
// from waiting for more events during graceful
156+
// shutdown.
157+
tracer.Close()
158+
return nil
159+
})
160+
g.Go(func() error {
161+
return tracerServer.serve(args.Reporter)
162+
})
163+
g.Go(func() error {
164+
return origRunServer(ctx, args)
165+
})
166+
return g.Wait()
167+
}
168168
}
169-
defer tracer.Close()
170169

171-
pub, err := publish.NewPublisher(b.Publisher, tracer, &publish.PublisherConfig{
172-
Info: b.Info, ShutdownTimeout: bt.config.ShutdownTimeout, Pipeline: bt.config.Pipeline,
170+
publisher, err := publish.NewPublisher(b.Publisher, tracer, &publish.PublisherConfig{
171+
Info: b.Info,
172+
ShutdownTimeout: bt.config.ShutdownTimeout,
173+
Pipeline: bt.config.Pipeline,
173174
})
174175
if err != nil {
175176
return err
176177
}
177-
defer pub.Stop()
178+
defer publisher.Stop()
178179

179-
lis, err := bt.listen()
180-
if err != nil {
181-
bt.logger.Error("failed to listen:", err)
182-
return nil
180+
stopped := make(chan struct{})
181+
defer close(stopped)
182+
ctx, cancelContext := context.WithCancel(context.Background())
183+
defer cancelContext()
184+
var stopOnce sync.Once
185+
stopServer := func() {
186+
stopOnce.Do(func() {
187+
cancelContext()
188+
<-stopped
189+
})
183190
}
184191

185192
bt.mutex.Lock()
186193
if bt.stopped {
187194
bt.mutex.Unlock()
188195
return nil
189196
}
190-
191-
bt.server, err = newServer(bt.logger, bt.config, tracer, pub.Send)
192-
if err != nil {
193-
bt.logger.Error("failed to create new server:", err)
194-
return nil
195-
}
197+
bt.stopServer = stopServer
196198
bt.mutex.Unlock()
197199

198-
//blocking until shutdown
199-
return bt.server.run(lis, tracerServer)
200+
return runServer(ctx, ServerParams{
201+
Config: bt.config,
202+
Logger: bt.logger,
203+
Tracer: tracer,
204+
Reporter: publisher.Send,
205+
})
200206
}
201207

202208
func isElasticsearchOutput(b *beat.Beat) bool {
@@ -222,16 +228,15 @@ func (bt *beater) registerPipelineCallback(b *beat.Beat) error {
222228
return err
223229
}
224230

225-
// Graceful shutdown
231+
// Stop stops the beater gracefully.
226232
func (bt *beater) Stop() {
227233
bt.mutex.Lock()
228234
defer bt.mutex.Unlock()
229-
if bt.stopped {
235+
if bt.stopped || bt.stopServer == nil {
230236
return
231237
}
232238
bt.logger.Infof("stopping apm-server... waiting maximum of %v seconds for queues to drain",
233239
bt.config.ShutdownTimeout.Seconds())
234-
bt.server.stop()
235-
close(bt.stopping)
240+
bt.stopServer()
236241
bt.stopped = true
237242
}

0 commit comments

Comments
 (0)