Skip to content

Commit

Permalink
reduce Elastic Agent shut down time by stopping processes concurrently (
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ authored Feb 14, 2022
1 parent 3de59f0 commit 952c2eb
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 5 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
- Fix agent configuration overwritten by default fleet config. {pull}29297[29297]
- Allow agent containers to use basic auth to create a service token. {pull}29651[29651]
- Fix issue where a failing artifact verification does not remove the bad artifact. {pull}30281[30281]
- Reduce Elastic Agent shut down by stopping processes in parallel {pull}29650[29650]

==== New features

Expand Down
21 changes: 19 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,26 @@ func (o *Operator) Shutdown() {
o.logger.Debugf("pipeline installer '%s' done", o.pipelineID)
}

for _, app := range o.apps {
app.Shutdown()
o.appsLock.Lock()
defer o.appsLock.Unlock()

wg := sync.WaitGroup{}
wg.Add(len(o.apps))

started := time.Now()

for _, a := range o.apps {
go func(a Application) {
started := time.Now()
a.Shutdown()
wg.Done()
o.logger.Debugf("took %s to shutdown %s",
time.Now().Sub(started), a.Name())
}(a)
}
wg.Wait()
o.logger.Debugf("took %s to shutdown %d apps",
time.Now().Sub(started), len(o.apps))
}

// Start starts a new process based on a configuration
Expand Down
14 changes: 12 additions & 2 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,13 @@ func (a *Application) Stop() {

if srvState != nil {
// signal stop through GRPC, wait and kill is performed later in gracefulKill
srvState.Stop(a.processConfig.StopTimeout)
if err := srvState.Stop(a.processConfig.StopTimeout); err != nil {
err := fmt.Errorf("failed to stop after %s: %w", a.processConfig.StopTimeout, err)
a.setState(state.Failed, err.Error(), nil)

a.logger.Error(err)
}

}

a.appLock.Lock()
Expand Down Expand Up @@ -280,7 +286,9 @@ func (a *Application) gracefulKill(proc *process.Info) {
}

// send stop signal to request stop
proc.Stop()
if err := proc.Stop(); err != nil {
a.logger.Errorf("failed to stop %s: %v", a.Name(), err)
}

var wg sync.WaitGroup
doneChan := make(chan struct{})
Expand All @@ -304,6 +312,8 @@ func (a *Application) gracefulKill(proc *process.Info) {
select {
case <-doneChan:
case <-t.C:
a.logger.Infof("gracefulKill timed out after %d, killing %s",
procExitTimeout, a.Name())
_ = proc.Process.Kill()
}
}
6 changes: 5 additions & 1 deletion x-pack/elastic-agent/pkg/core/plugin/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ func (a *Application) Stop() {

if err := srvState.Stop(a.processConfig.StopTimeout); err != nil {
a.appLock.Lock()
a.setState(state.Failed, errors.New(err, "Failed to stopped").Error(), nil)
a.setState(
state.Failed,
fmt.Errorf("failed to stop after %s: %w", a.processConfig.StopTimeout, err).Error(),
nil)
} else {
a.appLock.Lock()
a.setState(state.Stopped, "Stopped", nil)
Expand All @@ -275,6 +278,7 @@ func (a *Application) Stop() {
func (a *Application) Shutdown() {
a.appLock.Lock()
defer a.appLock.Unlock()
a.logger.Infof("signaling service to stop because of shutdown: %s", a.id)

if a.srvState == nil {
return
Expand Down

0 comments on commit 952c2eb

Please sign in to comment.