Skip to content

Commit

Permalink
Add better handling for Kubernetes Update and Delete watcher events (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored Jun 17, 2020
1 parent 8c23383 commit d573e8c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix an issue where error messages are not accurate in mapstriface. {issue}18662[18662] {pull}18663[18663]
- Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818]
- Fix potential race condition in fingerprint processor. {pull}18738[18738]
- Add better handling for Kubernetes Update and Delete watcher events. {pull}18882[18882]
- Fix the `translate_sid` processor's handling of unconfigured target fields. {issue}18990[18990] {pull}18991[18991]
- Fixed a service restart failure under Windows. {issue}18914[18914] {pull}18916[18916]
- The `monitoring.elasticsearch.api_key` value is correctly base64-encoded before being sent to the monitoring Elasticsearch cluster. {issue}18939[18939] {pull}18945[18945]
Expand Down
4 changes: 3 additions & 1 deletion libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {

err = a.factory.CheckConfig(config)
if err != nil {
a.logger.Error(errors.Wrap(err, fmt.Sprintf("Auto discover config check failed for config '%s', won't start runner", common.DebugString(config, true))))
a.logger.Error(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s', won't start runner",
common.DebugString(config, true))))
continue
}

Expand Down
19 changes: 12 additions & 7 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
return p, nil
}

// OnAdd ensures processing of service objects that are newly added
// OnAdd ensures processing of pod objects that are newly added
func (p *pod) OnAdd(obj interface{}) {
p.logger.Debugf("Watcher Node add: %+v", obj)
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
}

Expand All @@ -134,25 +134,30 @@ func (p *pod) OnAdd(obj interface{}) {
func (p *pod) OnUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)

// If Pod is in a phase where all containers in the have terminated emit a stop event
if pod.Status.Phase == kubernetes.PodSucceeded || pod.Status.Phase == kubernetes.PodFailed {
p.logger.Debugf("Watcher Pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase)
switch pod.Status.Phase {
case kubernetes.PodSucceeded, kubernetes.PodFailed:
// If Pod is in a phase where all containers in the have terminated emit a stop event
p.logger.Debugf("Watcher Pod update (terminating): %+v", obj)

time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") })
return
case kubernetes.PodPending:
p.logger.Debugf("Watcher Pod update (pending): don't know what to do with this Pod yet, skipping for now: %+v", obj)
return
}

p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(pod, "stop")
p.emit(pod, "start")
}

// GenerateHints creates hints needed for hints builder
// OnDelete stops pod objects that are deleted
func (p *pod) OnDelete(obj interface{}) {
p.logger.Debugf("Watcher Node delete: %+v", obj)
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
}

// GenerateHints creates hints needed for hints builder
func (p *pod) GenerateHints(event bus.Event) bus.Event {
// Try to build a config with enabled builders. Send a provider agnostic payload.
// Builders are Beat specific.
Expand Down
13 changes: 9 additions & 4 deletions libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ type WatchOptions struct {
}

type item struct {
object interface{}
state string
object interface{}
objectRaw interface{}
state string
}

type watcher struct {
Expand Down Expand Up @@ -175,8 +176,7 @@ func (w *watcher) enqueue(obj interface{}, state string) {
if err != nil {
return
}

w.queue.Add(&item{key, state})
w.queue.Add(&item{key, obj, state})
}

// process gets the top of the work queue and processes the object that is received.
Expand Down Expand Up @@ -204,6 +204,11 @@ func (w *watcher) process(ctx context.Context) bool {
return nil
}
if !exists {
if entry.state == delete {
w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key)
// delete anyway in order to clean states
w.handler.OnDelete(entry.objectRaw)
}
return nil
}

Expand Down

0 comments on commit d573e8c

Please sign in to comment.