Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Send updating state #21461

Merged
merged 5 commits into from
Oct 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@
- Add support for EQL based condition on inputs {pull}20994[20994]
- Send `fleet.host.id` to Endpoint Security {pull}21042[21042]
- Add `install` and `uninstall` subcommands {pull}21206[21206]
- Send updating state {pull}21461[21461]
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,13 @@ func newManaged(
}

managedApplication.upgrader = upgrade.NewUpgrader(
agentInfo,
cfg.Settings.DownloadConfig,
log,
[]context.CancelFunc{managedApplication.cancelCtxFn},
reexec,
acker)
acker,
combinedReporter)

actionDispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
Expand Down
77 changes: 69 additions & 8 deletions x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)
Expand All @@ -33,11 +34,13 @@ const (

// Upgrader performs an upgrade
type Upgrader struct {
agentInfo *info.AgentInfo
settings *artifact.Config
log *logger.Logger
closers []context.CancelFunc
reexec reexecManager
acker acker
reporter stateReporter
upgradeable bool
}

Expand All @@ -50,14 +53,19 @@ type acker interface {
Commit(ctx context.Context) error
}

type stateReporter interface {
OnStateChange(id string, name string, s state.State)
}

// NewUpgrader creates an upgrader which is capable of performing upgrade operation
func NewUpgrader(settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker) *Upgrader {
func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader {
return &Upgrader{
settings: settings,
log: log,
closers: closers,
reexec: reexec,
acker: a,
reporter: r,
upgradeable: getUpgradable(),
}
}
Expand All @@ -68,13 +76,22 @@ func (u *Upgrader) Upgradeable() bool {
}

// Upgrade upgrades running agent
func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) error {
func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err error) {
// report failed
defer func() {
if err != nil {
u.reportFailure(ctx, a, err)
}
}()

if !u.upgradeable {
return fmt.Errorf(
"cannot be upgraded; must be installed with install sub-command and " +
"running under control of the systems supervisor")
}

u.reportUpdating(a.Version)

sourceURI, err := u.sourceURI(a.Version, a.SourceURI)
archivePath, err := u.downloadArtifact(ctx, a.Version, sourceURI)
if err != nil {
Expand All @@ -91,7 +108,10 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) error
}

if strings.HasPrefix(release.Commit(), newHash) {
return errors.New("upgrading to same version")
// not an error
u.ackAction(ctx, a)
u.log.Warn("upgrading to same version")
return nil
}

if err := copyActionStore(newHash); err != nil {
Expand Down Expand Up @@ -132,11 +152,7 @@ func (u *Upgrader) Ack(ctx context.Context) error {
return nil
}

if err := u.acker.Ack(ctx, marker.Action); err != nil {
return err
}

if err := u.acker.Commit(ctx); err != nil {
if err := u.ackAction(ctx, marker.Action); err != nil {
return err
}

Expand All @@ -148,6 +164,7 @@ func (u *Upgrader) Ack(ctx context.Context) error {

return ioutil.WriteFile(markerFile, markerBytes, 0600)
}

func (u *Upgrader) sourceURI(version, retrievedURI string) (string, error) {
if strings.HasSuffix(version, "-SNAPSHOT") && retrievedURI == "" {
return "", errors.New("snapshot upgrade requires source uri", errors.TypeConfig)
Expand All @@ -159,6 +176,50 @@ func (u *Upgrader) sourceURI(version, retrievedURI string) (string, error) {
return u.settings.SourceURI, nil
}

// ackAction is used for successful updates, it was either updated successfully or to the same version
// so we need to remove updating state and get prevent from receiving same update action again.
func (u *Upgrader) ackAction(ctx context.Context, action fleetapi.Action) error {
if err := u.acker.Ack(ctx, action); err != nil {
return err
}

if err := u.acker.Commit(ctx); err != nil {
return err
}

u.reporter.OnStateChange(
"",
agentName,
state.State{Status: state.Running},
)

return nil
}

// report failure is used when update process fails. action is acked so it won't be received again
// and state is changed to FAILED
func (u *Upgrader) reportFailure(ctx context.Context, action fleetapi.Action, err error) {
// ack action
u.acker.Ack(ctx, action)

// report failure
u.reporter.OnStateChange(
"",
agentName,
state.State{Status: state.Failed, Message: err.Error()},
)
}

// reportUpdating sets state of agent to updating.
func (u *Upgrader) reportUpdating(version string) {
// report failure
u.reporter.OnStateChange(
"",
agentName,
state.State{Status: state.Updating, Message: fmt.Sprintf("Update to version '%s' started", version)},
)
}

func rollbackInstall(hash string) {
os.RemoveAll(filepath.Join(paths.Data(), fmt.Sprintf("%s-%s", agentName, hash)))
}
Expand Down
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
Crashed
// Restarting is status describing application is restarting.
Restarting
// Updating is status describing application is updating.
Updating
)

// State wraps the process state and application status.
Expand Down
6 changes: 6 additions & 0 deletions x-pack/elastic-agent/pkg/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
EventSubTypeFailed = "FAILED"
// EventSubTypeStopping is an event type indicating application is stopping.
EventSubTypeStopping = "STOPPING"
// EventSubTypeUpdating is an event type indicating update process in progress.
EventSubTypeUpdating = "UPDATING"
)

type agentInfo interface {
Expand Down Expand Up @@ -127,6 +129,10 @@ func generateRecord(agentID string, id string, name string, s state.State) event
case state.Restarting:
subType = EventSubTypeStarting
subTypeText = "RESTARTING"
case state.Updating:
subType = EventSubTypeUpdating
subTypeText = EventSubTypeUpdating

}

err := errors.New(
Expand Down