Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
- Use at least warning level for all status logs {pull}1218[1218]
- Remove fleet event reporter and events from checkin body. {issue}993[993]
- Fix unintended reset of source URI when downloading components {pull}1252[1252]
- Create separate status reporter for local only events so that degraded fleet-checkins no longer affect health on successful fleet-checkins. {issue}1157[1157] {pull}1285[1285]

==== New features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type fleetGateway struct {
checkinFailCounter int
statusController status.Controller
statusReporter status.Reporter
localReporter status.Reporter
stateStore stateStore
queue actionQueue
}
Expand Down Expand Up @@ -156,6 +157,7 @@ func newFleetGatewayWithScheduler(
done: done,
acker: acker,
statusReporter: statusController.RegisterComponent("gateway"),
localReporter: statusController.RegisterLocalComponent("gateway-checkin"),
statusController: statusController,
stateStore: stateStore,
queue: queue,
Expand Down Expand Up @@ -208,6 +210,7 @@ func (f *fleetGateway) worker() {
f.statusReporter.Update(state.Failed, errMsg, nil)
} else {
f.statusReporter.Update(state.Healthy, "", nil)
f.localReporter.Update(state.Healthy, "", nil) // we don't need to specifically set the local reporter to failed above, but it needs to be reset to healthy if a checking succeeds
}

case <-f.bgContext.Done():
Expand Down Expand Up @@ -291,12 +294,11 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
)

f.log.Error(err)
f.localReporter.Update(state.Failed, err.Error(), nil)
return nil, err
}
if f.checkinFailCounter > 1 {
// do not update status reporter with failure
// status reporter would report connection failure on first successful connection, leading to
// stale result for certain period causing slight confusion.
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
f.log.Errorf("checking number %d failed: %s", f.checkinFailCounter, err.Error())
}
continue
Expand Down Expand Up @@ -386,6 +388,7 @@ func (f *fleetGateway) stop() {
f.log.Info("Fleet gateway is stopping")
defer f.scheduler.Stop()
f.statusReporter.Unregister()
f.localReporter.Unregister()
close(f.done)
f.wg.Wait()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
Expand Down Expand Up @@ -703,12 +704,18 @@ func TestRetriesOnFailures(t *testing.T) {
queue.On("DequeueActions").Return([]fleetapi.Action{})
queue.On("Actions").Return([]fleetapi.Action{})

localReporter := &testutils.MockReporter{}
localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2)
localReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
localReporter.On("Unregister").Maybe()

fleetReporter := &testutils.MockReporter{}
fleetReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
fleetReporter.On("Unregister").Maybe()

statusController := &testutils.MockController{}
statusController.On("RegisterComponent", "gateway").Return(fleetReporter).Once()
statusController.On("RegisterLocalComponent", "gateway-checkin").Return(localReporter).Once()
statusController.On("StatusString").Return("string")

gateway, err := newFleetGatewayWithScheduler(
Expand Down Expand Up @@ -767,6 +774,7 @@ func TestRetriesOnFailures(t *testing.T) {
waitFn()
statusController.AssertExpectations(t)
fleetReporter.AssertExpectations(t)
localReporter.AssertExpectations(t)
})

t.Run("The retry loop is interruptible",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (

type noopController struct{}

func (*noopController) SetAgentID(_ string) {}
func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) SetAgentID(_ string) {}
func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) RegisterLocalComponent(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) RegisterComponentWithPersistance(_ string, _ bool) status.Reporter {
return &noopReporter{}
}
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
func (*noopController) LocalStatus() status.AgentStatus {
return status.AgentStatus{Status: status.Healthy}
}
func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/core/status/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ type LivenessResponse struct {
}

// ServeHTTP is an HTTP Handler for the status controller.
// It uses the local agent status so it is able to report a degraded state if the fleet-server checkin has issues.
// Respose code is 200 for a healthy agent, and 503 otherwise.
// Response body is a JSON object that contains the agent ID, status, message, and the last status update time.
func (r *controller) ServeHTTP(wr http.ResponseWriter, req *http.Request) {
s := r.Status()
s := r.LocalStatus()
lr := LivenessResponse{
ID: r.agentID,
Status: s.Status.String(),
Expand Down
84 changes: 75 additions & 9 deletions internal/pkg/core/status/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,31 @@ type AgentStatus struct {
type Controller interface {
SetAgentID(string)
RegisterComponent(string) Reporter
RegisterLocalComponent(string) Reporter
RegisterComponentWithPersistance(string, bool) Reporter
RegisterApp(id string, name string) Reporter
Status() AgentStatus
LocalStatus() AgentStatus
StatusCode() AgentStatusCode
StatusString() string
UpdateStateID(string)
ServeHTTP(http.ResponseWriter, *http.Request)
}

type controller struct {
updateTime time.Time
log *logger.Logger
reporters map[string]*reporter
appReporters map[string]*reporter
stateID string
message string
agentID string
status AgentStatusCode
mx sync.Mutex
updateTime time.Time
log *logger.Logger
reporters map[string]*reporter
localReporters map[string]*reporter
appReporters map[string]*reporter
stateID string
message string
agentID string
status AgentStatusCode
localStatus AgentStatusCode
localMessage string
localTime time.Time
mx sync.Mutex
}

// NewController creates a new reporter.
Expand Down Expand Up @@ -126,6 +132,28 @@ func (r *controller) UpdateStateID(stateID string) {
r.updateStatus()
}

// RegisterLocalComponent registers new component for local-only status updates.
func (r *controller) RegisterLocalComponent(componentIdentifier string) Reporter {
id := componentIdentifier + "-" + uuid.New().String()[:8]
rep := &reporter{
name: componentIdentifier,
isRegistered: true,
unregisterFunc: func() {
r.mx.Lock()
delete(r.localReporters, id)
r.mx.Unlock()
},
notifyChangeFunc: r.updateStatus,
isPersistent: false,
}

r.mx.Lock()
r.localReporters[id] = rep
r.mx.Unlock()

return rep
}

// Register registers new component for status updates.
func (r *controller) RegisterComponent(componentIdentifier string) Reporter {
return r.RegisterComponentWithPersistance(componentIdentifier, false)
Expand Down Expand Up @@ -199,6 +227,25 @@ func (r *controller) Status() AgentStatus {
}
}

// LocalStatus returns the status from the local registered components if they are different from the agent status.
// If the agent status is more severe then the local status (failed vs degraded for example) agent status is used.
// If they are equal (healthy and healthy) agent status is used.
func (r *controller) LocalStatus() AgentStatus {
status := r.Status()
r.mx.Lock()
defer r.mx.Unlock()

if r.localStatus > status.Status {
return AgentStatus{
Status: r.localStatus,
Message: r.localMessage,
UpdateTime: r.localTime,
}
}
return status

}

// StatusCode retrieves current agent status code.
func (r *controller) StatusCode() AgentStatusCode {
r.mx.Lock()
Expand All @@ -208,9 +255,23 @@ func (r *controller) StatusCode() AgentStatusCode {

func (r *controller) updateStatus() {
status := Healthy
lStatus := Healthy
message := ""
lMessage := ""

r.mx.Lock()
for id, rep := range r.localReporters {
s := statusToAgentStatus(rep.status)
if s > lStatus {
lStatus = s
lMessage = fmt.Sprintf("component %s: %s", id, rep.message)
}
r.log.Debugf("local component '%s' has status '%s'", id, s)
if status == Failed {
break
}
}

for id, rep := range r.reporters {
s := statusToAgentStatus(rep.status)
if s > status {
Expand Down Expand Up @@ -244,6 +305,11 @@ func (r *controller) updateStatus() {
r.message = message
r.updateTime = time.Now().UTC()
}
if r.localStatus != lStatus {
r.localStatus = lStatus
r.localMessage = lMessage
r.localTime = time.Now().UTC()
}

r.mx.Unlock()

Expand Down
10 changes: 10 additions & 0 deletions internal/pkg/testutils/status_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ func (m *MockController) RegisterComponent(id string) status.Reporter {
return args.Get(0).(status.Reporter)
}

func (m *MockController) RegisterLocalComponent(id string) status.Reporter {
args := m.Called(id)
return args.Get(0).(status.Reporter)
}

func (m *MockController) RegisterComponentWithPersistance(id string, b bool) status.Reporter {
args := m.Called(id, b)
return args.Get(0).(status.Reporter)
Expand All @@ -40,6 +45,11 @@ func (m *MockController) Status() status.AgentStatus {
return args.Get(0).(status.AgentStatus)
}

func (m *MockController) LocalStatus() status.AgentStatus {
args := m.Called()
return args.Get(0).(status.AgentStatus)
}

func (m *MockController) StatusCode() status.AgentStatusCode {
args := m.Called()
return args.Get(0).(status.AgentStatusCode)
Expand Down