Skip to content

Timeouts on calls to docker api #12310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- New module `panw` for Palo Alto Networks PAN-OS logs. {pull}11999[11999]
- Add RabbitMQ module. {pull}12032[12032]
- Add new `container` input. {pull}12162[12162]
- Add timeouts on communication with docker daemon. {pull}12310[12310]
- `container` and `docker` inputs now support reading of labels and env vars written by docker JSON file logging driver. {issue}8358[8358]
- Add specific date processor to convert timezones so same pipeline can be used when convert_timezone is enabled or disabled. {pull}12253[12253]
- Add MSSQL module {pull}12079[12079]
Expand Down
68 changes: 49 additions & 19 deletions libbeat/common/docker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ import (

// Select Docker API version
const (
shortIDLen = 12
shortIDLen = 12
dockerRequestTimeout = 10 * time.Second
dockerWatchRequestTimeout = 60 * time.Minute
dockerEventsWatchPityTimerInterval = 10 * time.Second
dockerEventsWatchPityTimerTimeout = 10 * time.Minute
)

// Watcher reads docker events and keeps a list of known containers
Expand Down Expand Up @@ -68,16 +72,17 @@ type TLSConfig struct {

type watcher struct {
sync.RWMutex
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
lastValidTimestamp int64
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
client Client
ctx context.Context
stop context.CancelFunc
containers map[string]*Container
deleted map[string]time.Time // deleted annotations key -> last access time
cleanupTimeout time.Duration
lastValidTimestamp int64
lastWatchReceivedEventTime time.Time
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
}

// Container info retrieved by the watcher
Expand Down Expand Up @@ -224,20 +229,30 @@ func (w *watcher) watch() {
filter := filters.NewArgs()
filter.Add("type", "container")

options := types.EventsOptions{
Since: fmt.Sprintf("%d", w.lastValidTimestamp),
Filters: filter,
}

for {
events, errors := w.client.Events(w.ctx, options)
options := types.EventsOptions{
Since: fmt.Sprintf("%d", w.lastValidTimestamp),
Filters: filter,
}

logp.Debug("docker", "Fetching events since %s", options.Since)
ctx, cancel := context.WithTimeout(w.ctx, dockerWatchRequestTimeout)
defer cancel()

events, errors := w.client.Events(ctx, options)

//ticker for timeout to restart watcher when no events are received
w.lastWatchReceivedEventTime = time.Now()
tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval)
defer tickChan.Stop()

WATCH:
for {
select {
case event := <-events:
logp.Debug("docker", "Got a new docker event: %v", event)
w.lastValidTimestamp = event.Time
w.lastWatchReceivedEventTime = time.Now()

// Add / update
if event.Action == "start" || event.Action == "update" {
Expand Down Expand Up @@ -289,17 +304,29 @@ func (w *watcher) watch() {
time.Sleep(1 * time.Second)
break WATCH

case <-tickChan.C:
if time.Since(w.lastWatchReceivedEventTime) > dockerEventsWatchPityTimerTimeout {
logp.Info("No events received withing %s, restarting watch call", dockerEventsWatchPityTimerTimeout)
time.Sleep(1 * time.Second)
break WATCH
}

case <-w.ctx.Done():
logp.Debug("docker", "Watcher stopped")
w.stopped.Done()
return
}
}

}
}

func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Container, error) {
containers, err := w.client.ContainerList(w.ctx, options)
logp.Debug("docker", "List containers")
ctx, cancel := context.WithTimeout(w.ctx, dockerRequestTimeout)
defer cancel()

containers, err := w.client.ContainerList(ctx, options)
if err != nil {
return nil, err
}
Expand All @@ -316,7 +343,10 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain
// If there are no network interfaces, assume that the container is on host network
// Inspect the container directly and use the hostname as the IP address in order
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumption is actually incorrect. Docker can have some of known network modes: default (bridge), host and container (usually used in cloud platforms like kubernetes).

It can be determined by checking .HostConfig.NetworkMode property:

$ sudo curl -s --fail -m3 -XGET --unix-socket /var/run/docker.sock http://localhost/containers/json | jq -r .[].HostConfig.NetworkMode
container:b79efb8d2710b3fb2ae6b9fff5e1cf6f8e841301df9a355371c197156d72c3d4
container:a7fb40e3f13684e91f26c1ed26096e0d39df5d7860db31bab5f924cec5838366
default
default
default
host
container:9f187b5c30cb2ceb43907a2513e093f1039be42c230c33d70a601e9c3173dc5d
default
default

if network mode is container:<ID> this <ID> should be inspected to fetch from standard config .NetworkSettings.Networks like above.

But this is subject for separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds about right! are you willing to open a new PR? 😇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to take a shot on this next week

if len(ipaddresses) == 0 {
info, err := w.client.ContainerInspect(w.ctx, c.ID)
logp.Debug("docker", "Inspect container %s", c.ID)
ctx, cancel := context.WithTimeout(w.ctx, dockerRequestTimeout)
defer cancel()
info, err := w.client.ContainerInspect(ctx, c.ID)
if err == nil {
ipaddresses = append(ipaddresses, info.Config.Hostname)
} else {
Expand Down