Skip to content

Commit 7f10625

Browse files
[8.4](backport #1285) Expand status reporter/controller interfaces to allow local reporters (#1305)
* Expand status reporter/controller interfaces to allow local reporters (#1285) * Expand status reporter/controller interfaces to allow local reporters Add a local reporter map to the status controller. These reporters are not used when updating status with fleet-server, they are only used to gather local state information - specifically if the agent is degraded because checkin with fleet-server has failed. This bypasses the bug that was introduced with the liveness endpoint where the agent could checkin (to fleet-server) with a degraded status because a previous checkin failed. Local reporters are used to generate a separate status. This status is used in the liveness endpoint. * fix linter (cherry picked from commit 717708a) # Conflicts: # internal/pkg/core/status/reporter.go * Fix merge conflict Co-authored-by: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> Co-authored-by: michel-laterman <michel.laterman@elastic.co>
1 parent 0283688 commit 7f10625

File tree

7 files changed

+110
-17
lines changed

7 files changed

+110
-17
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
- Use at least warning level for all status logs {pull}1218[1218]
117117
- Remove fleet event reporter and events from checkin body. {issue}993[993]
118118
- Fix unintended reset of source URI when downloading components {pull}1252[1252]
119+
- Create separate status reporter for local only events so that degraded fleet-checkins no longer affect health on successful fleet-checkins. {issue}1157[1157] {pull}1285[1285]
119120

120121
==== New features
121122

internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ type fleetGateway struct {
8888
checkinFailCounter int
8989
statusController status.Controller
9090
statusReporter status.Reporter
91+
localReporter status.Reporter
9192
stateStore stateStore
9293
queue actionQueue
9394
}
@@ -156,6 +157,7 @@ func newFleetGatewayWithScheduler(
156157
done: done,
157158
acker: acker,
158159
statusReporter: statusController.RegisterComponent("gateway"),
160+
localReporter: statusController.RegisterLocalComponent("gateway-checkin"),
159161
statusController: statusController,
160162
stateStore: stateStore,
161163
queue: queue,
@@ -208,6 +210,7 @@ func (f *fleetGateway) worker() {
208210
f.statusReporter.Update(state.Failed, errMsg, nil)
209211
} else {
210212
f.statusReporter.Update(state.Healthy, "", nil)
213+
f.localReporter.Update(state.Healthy, "", nil) // we don't need to specifically set the local reporter to failed above, but it needs to be reset to healthy if a checking succeeds
211214
}
212215

213216
case <-f.bgContext.Done():
@@ -291,12 +294,11 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
291294
)
292295

293296
f.log.Error(err)
297+
f.localReporter.Update(state.Failed, err.Error(), nil)
294298
return nil, err
295299
}
296300
if f.checkinFailCounter > 1 {
297-
// do not update status reporter with failure
298-
// status reporter would report connection failure on first successful connection, leading to
299-
// stale result for certain period causing slight confusion.
301+
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
300302
f.log.Errorf("checking number %d failed: %s", f.checkinFailCounter, err.Error())
301303
}
302304
continue
@@ -386,6 +388,7 @@ func (f *fleetGateway) stop() {
386388
f.log.Info("Fleet gateway is stopping")
387389
defer f.scheduler.Stop()
388390
f.statusReporter.Unregister()
391+
f.localReporter.Unregister()
389392
close(f.done)
390393
f.wg.Wait()
391394
}

internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
2626
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
2727
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
28+
"github.com/elastic/elastic-agent/internal/pkg/core/state"
2829
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
2930
noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
3031
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
@@ -703,12 +704,18 @@ func TestRetriesOnFailures(t *testing.T) {
703704
queue.On("DequeueActions").Return([]fleetapi.Action{})
704705
queue.On("Actions").Return([]fleetapi.Action{})
705706

707+
localReporter := &testutils.MockReporter{}
708+
localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2)
709+
localReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
710+
localReporter.On("Unregister").Maybe()
711+
706712
fleetReporter := &testutils.MockReporter{}
707713
fleetReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
708714
fleetReporter.On("Unregister").Maybe()
709715

710716
statusController := &testutils.MockController{}
711717
statusController.On("RegisterComponent", "gateway").Return(fleetReporter).Once()
718+
statusController.On("RegisterLocalComponent", "gateway-checkin").Return(localReporter).Once()
712719
statusController.On("StatusString").Return("string")
713720

714721
gateway, err := newFleetGatewayWithScheduler(
@@ -767,6 +774,7 @@ func TestRetriesOnFailures(t *testing.T) {
767774
waitFn()
768775
statusController.AssertExpectations(t)
769776
fleetReporter.AssertExpectations(t)
777+
localReporter.AssertExpectations(t)
770778
})
771779

772780
t.Run("The retry loop is interruptible",

internal/pkg/agent/application/gateway/fleet/noop_status_controller_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@ import (
1313

1414
type noopController struct{}
1515

16-
func (*noopController) SetAgentID(_ string) {}
17-
func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} }
16+
func (*noopController) SetAgentID(_ string) {}
17+
func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} }
18+
func (*noopController) RegisterLocalComponent(_ string) status.Reporter { return &noopReporter{} }
1819
func (*noopController) RegisterComponentWithPersistance(_ string, _ bool) status.Reporter {
1920
return &noopReporter{}
2021
}
21-
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
22-
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
22+
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
23+
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
24+
func (*noopController) LocalStatus() status.AgentStatus {
25+
return status.AgentStatus{Status: status.Healthy}
26+
}
2327
func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy }
2428
func (*noopController) UpdateStateID(_ string) {}
2529
func (*noopController) StatusString() string { return "online" }

internal/pkg/core/status/handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ type LivenessResponse struct {
1919
}
2020

2121
// ServeHTTP is an HTTP Handler for the status controller.
22+
// It uses the local agent status so it is able to report a degraded state if the fleet-server checkin has issues.
2223
// Respose code is 200 for a healthy agent, and 503 otherwise.
2324
// Response body is a JSON object that contains the agent ID, status, message, and the last status update time.
2425
func (r *controller) ServeHTTP(wr http.ResponseWriter, req *http.Request) {
25-
s := r.Status()
26+
s := r.LocalStatus()
2627
lr := LivenessResponse{
2728
ID: r.agentID,
2829
Status: s.Status.String(),

internal/pkg/core/status/reporter.go

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,25 +58,31 @@ type AgentStatus struct {
5858
type Controller interface {
5959
SetAgentID(string)
6060
RegisterComponent(string) Reporter
61+
RegisterLocalComponent(string) Reporter
6162
RegisterComponentWithPersistance(string, bool) Reporter
6263
RegisterApp(id string, name string) Reporter
6364
Status() AgentStatus
65+
LocalStatus() AgentStatus
6466
StatusCode() AgentStatusCode
6567
StatusString() string
6668
UpdateStateID(string)
6769
ServeHTTP(http.ResponseWriter, *http.Request)
6870
}
6971

7072
type controller struct {
71-
mx sync.Mutex
72-
status AgentStatusCode
73-
message string
74-
updateTime time.Time
75-
reporters map[string]*reporter
76-
appReporters map[string]*reporter
77-
log *logger.Logger
78-
stateID string
79-
agentID string
73+
updateTime time.Time
74+
log *logger.Logger
75+
reporters map[string]*reporter
76+
localReporters map[string]*reporter
77+
appReporters map[string]*reporter
78+
stateID string
79+
message string
80+
agentID string
81+
status AgentStatusCode
82+
localStatus AgentStatusCode
83+
localMessage string
84+
localTime time.Time
85+
mx sync.Mutex
8086
}
8187

8288
// NewController creates a new reporter.
@@ -126,6 +132,28 @@ func (r *controller) UpdateStateID(stateID string) {
126132
r.updateStatus()
127133
}
128134

135+
// RegisterLocalComponent registers new component for local-only status updates.
136+
func (r *controller) RegisterLocalComponent(componentIdentifier string) Reporter {
137+
id := componentIdentifier + "-" + uuid.New().String()[:8]
138+
rep := &reporter{
139+
name: componentIdentifier,
140+
isRegistered: true,
141+
unregisterFunc: func() {
142+
r.mx.Lock()
143+
delete(r.localReporters, id)
144+
r.mx.Unlock()
145+
},
146+
notifyChangeFunc: r.updateStatus,
147+
isPersistent: false,
148+
}
149+
150+
r.mx.Lock()
151+
r.localReporters[id] = rep
152+
r.mx.Unlock()
153+
154+
return rep
155+
}
156+
129157
// Register registers new component for status updates.
130158
func (r *controller) RegisterComponent(componentIdentifier string) Reporter {
131159
return r.RegisterComponentWithPersistance(componentIdentifier, false)
@@ -199,6 +227,25 @@ func (r *controller) Status() AgentStatus {
199227
}
200228
}
201229

230+
// LocalStatus returns the status from the local registered components if they are different from the agent status.
231+
// If the agent status is more severe then the local status (failed vs degraded for example) agent status is used.
232+
// If they are equal (healthy and healthy) agent status is used.
233+
func (r *controller) LocalStatus() AgentStatus {
234+
status := r.Status()
235+
r.mx.Lock()
236+
defer r.mx.Unlock()
237+
238+
if r.localStatus > status.Status {
239+
return AgentStatus{
240+
Status: r.localStatus,
241+
Message: r.localMessage,
242+
UpdateTime: r.localTime,
243+
}
244+
}
245+
return status
246+
247+
}
248+
202249
// StatusCode retrieves current agent status code.
203250
func (r *controller) StatusCode() AgentStatusCode {
204251
r.mx.Lock()
@@ -208,9 +255,23 @@ func (r *controller) StatusCode() AgentStatusCode {
208255

209256
func (r *controller) updateStatus() {
210257
status := Healthy
258+
lStatus := Healthy
211259
message := ""
260+
lMessage := ""
212261

213262
r.mx.Lock()
263+
for id, rep := range r.localReporters {
264+
s := statusToAgentStatus(rep.status)
265+
if s > lStatus {
266+
lStatus = s
267+
lMessage = fmt.Sprintf("component %s: %s", id, rep.message)
268+
}
269+
r.log.Debugf("local component '%s' has status '%s'", id, s)
270+
if status == Failed {
271+
break
272+
}
273+
}
274+
214275
for id, rep := range r.reporters {
215276
s := statusToAgentStatus(rep.status)
216277
if s > status {
@@ -244,6 +305,11 @@ func (r *controller) updateStatus() {
244305
r.message = message
245306
r.updateTime = time.Now().UTC()
246307
}
308+
if r.localStatus != lStatus {
309+
r.localStatus = lStatus
310+
r.localMessage = lMessage
311+
r.localTime = time.Now().UTC()
312+
}
247313

248314
r.mx.Unlock()
249315

internal/pkg/testutils/status_reporter.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ func (m *MockController) RegisterComponent(id string) status.Reporter {
2525
return args.Get(0).(status.Reporter)
2626
}
2727

28+
func (m *MockController) RegisterLocalComponent(id string) status.Reporter {
29+
args := m.Called(id)
30+
return args.Get(0).(status.Reporter)
31+
}
32+
2833
func (m *MockController) RegisterComponentWithPersistance(id string, b bool) status.Reporter {
2934
args := m.Called(id, b)
3035
return args.Get(0).(status.Reporter)
@@ -40,6 +45,11 @@ func (m *MockController) Status() status.AgentStatus {
4045
return args.Get(0).(status.AgentStatus)
4146
}
4247

48+
func (m *MockController) LocalStatus() status.AgentStatus {
49+
args := m.Called()
50+
return args.Get(0).(status.AgentStatus)
51+
}
52+
4353
func (m *MockController) StatusCode() status.AgentStatusCode {
4454
args := m.Called()
4555
return args.Get(0).(status.AgentStatusCode)

0 commit comments

Comments
 (0)