Skip to content

Commit

Permalink
add Elastic APM instrumentation (elastic#180)
Browse files Browse the repository at this point in the history
Co-authored-by: Anderson Queiroz <anderson.queiroz@elastic.co>
  • Loading branch information
stuartnelson3 and AndersonQ authored Mar 22, 2022
1 parent ce906b8 commit d108e49
Show file tree
Hide file tree
Showing 51 changed files with 2,063 additions and 448 deletions.
1,366 changes: 1,105 additions & 261 deletions NOTICE.txt

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ require (
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.elastic.co/apm v1.13.0 // indirect
go.elastic.co/apm/module/apmhttp v1.10.0 // indirect
go.elastic.co/apm/module/apmhttp v1.15.0 // indirect
go.elastic.co/fastjson v1.1.0 // indirect
go.uber.org/atomic v1.8.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
Expand All @@ -127,6 +126,7 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc/examples v0.0.0-20220304170021-431ea809a767 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
howett.net/plist v0.0.0-20201203080718-1454fab16a06 // indirect
Expand All @@ -141,6 +141,9 @@ require (
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220303073437-a28c413604b8
github.com/hashicorp/go-version v1.2.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
go.elastic.co/apm v1.15.0
go.elastic.co/apm/module/apmgorilla v1.15.0
go.elastic.co/apm/module/apmgrpc v1.15.0
k8s.io/klog/v2 v2.30.0 // indirect
)

Expand Down
14 changes: 12 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ github.com/gosuri/uitable v0.0.4/go.mod h1:tKR86bXuXPZazfOTG1FIzvjIdXzd0mo4Vtn16
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
Expand Down Expand Up @@ -1680,13 +1681,19 @@ github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wK
go.elastic.co/apm v1.7.2/go.mod h1:tCw6CkOJgkWnzEthFN9HUP1uL3Gjc/Ur6m7gRPLaoH0=
go.elastic.co/apm v1.10.0/go.mod h1:qoOSi09pnzJDh5fKnfY7bPmQgl8yl2tULdOu03xhui0=
go.elastic.co/apm v1.11.0/go.mod h1:qoOSi09pnzJDh5fKnfY7bPmQgl8yl2tULdOu03xhui0=
go.elastic.co/apm v1.13.0 h1:SUCDscFfpshuxoxuoSh+r+2E3v1uvd5HrlRK5ZHpyvs=
go.elastic.co/apm v1.13.0/go.mod h1:dylGv2HKR0tiCV+wliJz1KHtDyuD8SPe69oV7VyK6WY=
go.elastic.co/apm v1.15.0 h1:uPk2g/whK7c7XiZyz/YCUnAUBNPiyNeE3ARX3G6Gx7Q=
go.elastic.co/apm v1.15.0/go.mod h1:dylGv2HKR0tiCV+wliJz1KHtDyuD8SPe69oV7VyK6WY=
go.elastic.co/apm/module/apmelasticsearch v1.7.2/go.mod h1:ZyNFuyWdt42GBZkz0SogoLzDBrBGj4orxpiUuxYeYq8=
go.elastic.co/apm/module/apmelasticsearch v1.10.0/go.mod h1:lwoaGDfZzfb9e6TXd3h8/KNmLAONOas7o5NLVNmv8Xk=
go.elastic.co/apm/module/apmgorilla v1.15.0 h1:1yTAksffgaFXYEIwlLRiQnxLfy3p3RtpDw8HDupIJfY=
go.elastic.co/apm/module/apmgorilla v1.15.0/go.mod h1:+23mZudYvZ9VgxCQjseLo9EF5gkKEr0KSQBupw+rzP8=
go.elastic.co/apm/module/apmgrpc v1.15.0 h1:Z7h58uuMJUoYXK6INFunlcGEXZQ18QKAhPh6NFYDNHE=
go.elastic.co/apm/module/apmgrpc v1.15.0/go.mod h1:IEbTGJzY5Xx737PkHDT3bbzh9syovK+IfAlckJsUgPE=
go.elastic.co/apm/module/apmhttp v1.7.2/go.mod h1:sTFWiWejnhSdZv6+dMgxGec2Nxe/ZKfHfz/xtRM+cRY=
go.elastic.co/apm/module/apmhttp v1.10.0 h1:/TzX6+ioYkMzm4BcGnOT/UUmcnSmfsny1hds93s01Lk=
go.elastic.co/apm/module/apmhttp v1.10.0/go.mod h1:Y4timwcJ8sQWbWpcw3Y7Mat1OssNpGhpwyfUnpqIDew=
go.elastic.co/apm/module/apmhttp v1.15.0 h1:Le/DhI0Cqpr9wG/NIGOkbz7+rOMqJrfE4MRG6q/+leU=
go.elastic.co/apm/module/apmhttp v1.15.0/go.mod h1:NruY6Jq8ALLzWUVUQ7t4wIzn+onKoiP5woJJdTV7GMg=
go.elastic.co/ecszap v0.3.0 h1:Zo/Y4sJLqbWDlqCHI4F4Lzeg0Fs4+n5ldVis4h9xV8w=
go.elastic.co/ecszap v0.3.0/go.mod h1:HTUi+QRmr3EuZMqxPX+5fyOdMNfUu5iPebgfhgsTJYQ=
go.elastic.co/fastjson v1.0.0/go.mod h1:PmeUOMMtLHQr9ZS9J9owrAVg0FkaZDRZJEFTTGHtchs=
Expand Down Expand Up @@ -2320,6 +2327,7 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201019141844-1ed22bb0c154/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down Expand Up @@ -2400,6 +2408,8 @@ google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzI
google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/grpc/examples v0.0.0-20220304170021-431ea809a767 h1:r16FSFCMhn7+LU8CzbtAIKppYeU6NUPJVdvXeIqVIq8=
google.golang.org/grpc/examples v0.0.0-20220304170021-431ea809a767/go.mod h1:wKDg0brwMZpaizQ1i7IzYcJjH1TmbJudYdnQC9+J+LE=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
20 changes: 15 additions & 5 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"fmt"

"go.elastic.co/apm"

"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/core/status"
Expand Down Expand Up @@ -39,7 +41,14 @@ type upgraderControl interface {
}

// New creates a new Agent and bootstrap the required subsystem.
func New(log *logger.Logger, reexec reexecManager, statusCtrl status.Controller, uc upgraderControl, agentInfo *info.AgentInfo) (Application, error) {
func New(
log *logger.Logger,
reexec reexecManager,
statusCtrl status.Controller,
uc upgraderControl,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (Application, error) {
// Load configuration from disk to understand in which mode of operation
// we must start the elastic-agent, the mode of operation cannot be changed without restarting the
// elastic-agent.
Expand All @@ -53,7 +62,7 @@ func New(log *logger.Logger, reexec reexecManager, statusCtrl status.Controller,
return nil, err
}

return createApplication(log, pathConfigFile, rawConfig, reexec, statusCtrl, uc, agentInfo)
return createApplication(log, pathConfigFile, rawConfig, reexec, statusCtrl, uc, agentInfo, tracer)
}

func createApplication(
Expand All @@ -64,6 +73,7 @@ func createApplication(
statusCtrl status.Controller,
uc upgraderControl,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (Application, error) {
log.Info("Detecting execution mode")
ctx := context.Background()
Expand All @@ -74,7 +84,7 @@ func createApplication(

if configuration.IsStandalone(cfg.Fleet) {
log.Info("Agent is managed locally")
return newLocal(ctx, log, paths.ConfigFile(), rawConfig, reexec, statusCtrl, uc, agentInfo)
return newLocal(ctx, log, paths.ConfigFile(), rawConfig, reexec, statusCtrl, uc, agentInfo, tracer)
}

// not in standalone; both modes require reading the fleet.yml configuration file
Expand All @@ -86,11 +96,11 @@ func createApplication(

if configuration.IsFleetServerBootstrap(cfg.Fleet) {
log.Info("Agent is in Fleet Server bootstrap mode")
return newFleetServerBootstrap(ctx, log, pathConfigFile, rawConfig, statusCtrl, agentInfo)
return newFleetServerBootstrap(ctx, log, pathConfigFile, rawConfig, statusCtrl, agentInfo, tracer)
}

log.Info("Agent is managed by Fleet")
return newManaged(ctx, log, store, cfg, rawConfig, reexec, statusCtrl, agentInfo)
return newManaged(ctx, log, store, cfg, rawConfig, reexec, statusCtrl, agentInfo, tracer)
}

func mergeFleetConfig(rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) {
Expand Down
15 changes: 10 additions & 5 deletions internal/pkg/agent/application/fleet_server_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package application
import (
"context"

"go.elastic.co/apm"

"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/sorted"
Expand Down Expand Up @@ -51,6 +53,7 @@ func newFleetServerBootstrap(
rawConfig *config.Config,
statusCtrl status.Controller,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (*FleetServerBootstrap, error) {
cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
Expand Down Expand Up @@ -79,7 +82,7 @@ func newFleetServerBootstrap(
}

bootstrapApp.bgContext, bootstrapApp.cancelCtxFn = context.WithCancel(ctx)
bootstrapApp.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{})
bootstrapApp.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{}, tracer)
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
}
Expand Down Expand Up @@ -167,20 +170,22 @@ func bootstrapEmitter(ctx context.Context, log *logger.Logger, agentInfo transpi
case c = <-ch:
}

err := emit(log, agentInfo, router, modifiers, c)
err := emit(ctx, log, agentInfo, router, modifiers, c)
if err != nil {
log.Error(err)
}
}
}()

return func(c *config.Config) error {
return func(ctx context.Context, c *config.Config) error {
span, _ := apm.StartSpan(ctx, "emit", "app.internal")
defer span.End()
ch <- c
return nil
}, nil
}

func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Router, modifiers *pipeline.ConfigModifiers, c *config.Config) error {
func emit(ctx context.Context, log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Router, modifiers *pipeline.ConfigModifiers, c *config.Config) error {
if err := info.InjectAgentConfig(c); err != nil {
return err
}
Expand Down Expand Up @@ -219,7 +224,7 @@ func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Ro
return errors.New("bootstrap configuration is incorrect causing fleet-server to not be started")
}

return router.Route(ast.HashStr(), map[pipeline.RoutingKey][]program.Program{
return router.Route(ctx, ast.HashStr(), map[pipeline.RoutingKey][]program.Program{
pipeline.DefaultRK: {
{
Spec: spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (f *fleetGateway) worker() {
}

var errMsg string
if err := f.dispatcher.Dispatch(f.acker, actions...); err != nil {
if err := f.dispatcher.Dispatch(context.Background(), f.acker, actions...); err != nil {
errMsg = fmt.Sprintf("failed to dispatch actions, error: %s", err)
f.log.Error(errMsg)
f.statusReporter.Update(state.Failed, errMsg, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type testingDispatcher struct {
received chan struct{}
}

func (t *testingDispatcher) Dispatch(acker store.FleetAcker, actions ...fleetapi.Action) error {
func (t *testingDispatcher) Dispatch(_ context.Context, acker store.FleetAcker, actions ...fleetapi.Action) error {
t.Lock()
defer t.Unlock()
defer func() { t.received <- struct{}{} }()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func New(

// Start starts the gateway.
func (w *fleetServerWrapper) Start() error {
err := w.emitter(w.injectedCfg)
err := w.emitter(context.Background(), w.injectedCfg)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion internal/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"path/filepath"

"go.elastic.co/apm"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/filters"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
Expand Down Expand Up @@ -66,6 +68,7 @@ func newLocal(
statusCtrl status.Controller,
uc upgraderControl,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (*Local, error) {
caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), log, statusCtrl)
if err != nil {
Expand All @@ -92,7 +95,7 @@ func newLocal(
}

localApplication.bgContext, localApplication.cancelCtxFn = context.WithCancel(ctx)
localApplication.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{})
localApplication.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{}, tracer)
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
}
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"fmt"

"go.elastic.co/apm"

"github.com/elastic/go-sysinfo"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/filters"
Expand Down Expand Up @@ -78,6 +80,7 @@ func newManaged(
reexec reexecManager,
statusCtrl status.Controller,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (*Managed, error) {
caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), log, statusCtrl)
if err != nil {
Expand Down Expand Up @@ -105,7 +108,7 @@ func newManaged(
}

managedApplication.bgContext, managedApplication.cancelCtxFn = context.WithCancel(ctx)
managedApplication.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{})
managedApplication.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{}, tracer)
if err != nil {
return nil, errors.New(err, "initialize GRPC listener", errors.TypeNetwork)
}
Expand Down Expand Up @@ -244,7 +247,7 @@ func newManaged(
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
if err := store.ReplayActions(log, actionDispatcher, actionAcker, actions...); err != nil {
if err := store.ReplayActions(ctx, log, actionDispatcher, actionAcker, actions...); err != nil {
log.Errorf("could not recover state, error %+v, skipping...", err)
}
stateRestored = true
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/managed_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestManagedModeRouting(t *testing.T) {
actions, err := testActions()
require.NoError(t, err)

err = actionDispatcher.Dispatch(noopacker.NewAcker(), actions...)
err = actionDispatcher.Dispatch(context.Background(), noopacker.NewAcker(), actions...)
require.NoError(t, err)

// has 1 config request for fb, mb and monitoring?
Expand Down Expand Up @@ -101,7 +101,7 @@ func newMockStreamStore() *mockStreamStore {
}
}

func (m *mockStreamStore) Execute(cr configrequest.Request) error {
func (m *mockStreamStore) Execute(_ context.Context, cr configrequest.Request) error {
m.store = append(m.store, cr)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/once.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ func readfiles(ctx context.Context, files []string, loader *config.Loader, emitt
return errors.New(err, "could not load or merge configuration", errors.TypeConfig)
}

return emitter(c)
return emitter(ctx, c)
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker stor
if err != nil {
return err
}
if err := h.emitter(c); err != nil {
if err := h.emitter(ctx, c); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type mockEmitter struct {
policy *config.Config
}

func (m *mockEmitter) Emitter(policy *config.Config) error {
func (m *mockEmitter) Emitter(_ context.Context, policy *config.Config) error {
m.policy = policy
return m.err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *Unenroll) Handle(ctx context.Context, a fleetapi.Action, acker store.Fl

// Providing empty map will close all pipelines
noPrograms := make(map[pipeline.RoutingKey][]program.Program)
h.dispatcher.Route(a.ID(), noPrograms)
_ = h.dispatcher.Route(ctx, a.ID(), noPrograms)

if !action.IsDetected {
// ACK only events comming from fleet
Expand Down
20 changes: 18 additions & 2 deletions internal/pkg/agent/application/pipeline/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"reflect"
"strings"

"go.elastic.co/apm"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/actions"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
Expand Down Expand Up @@ -74,7 +76,21 @@ func (ad *ActionDispatcher) key(a fleetapi.Action) string {
}

// Dispatch dispatches an action using pre-registered set of handlers.
func (ad *ActionDispatcher) Dispatch(acker store.FleetAcker, actions ...fleetapi.Action) error {
// ctx is used here ONLY to carry the span, for cancelation use the cancel
// function of the ActionDispatcher.ctx.
func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker store.FleetAcker, actions ...fleetapi.Action) (err error) {
span, ctx := apm.StartSpan(ctx, "dispatch", "app.internal")
defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()

// Creating a child context that carries both the ad.ctx cancelation and
// the span from ctx.
ctx, cancel := context.WithCancel(ad.ctx)
defer cancel()
ctx = apm.ContextWithSpan(ctx, span)

if len(actions) == 0 {
ad.log.Debug("No action to dispatch")
return nil
Expand All @@ -98,7 +114,7 @@ func (ad *ActionDispatcher) Dispatch(acker store.FleetAcker, actions ...fleetapi
ad.log.Debugf("Successfully dispatched action: '%+v'", action)
}

return acker.Commit(ad.ctx)
return acker.Commit(ctx)
}

func (ad *ActionDispatcher) dispatchAction(a fleetapi.Action, acker store.FleetAcker) error {
Expand Down
Loading

0 comments on commit d108e49

Please sign in to comment.