-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add add_docker_metadata
processor
#4352
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
package add_docker_metadata | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/logp" | ||
"github.com/elastic/beats/libbeat/processors" | ||
) | ||
|
||
func init() { | ||
processors.RegisterPlugin("add_docker_metadata", newDockerMetadataProcessor) | ||
} | ||
|
||
type addDockerMetadata struct { | ||
watcher Watcher | ||
fields []string | ||
} | ||
|
||
func newDockerMetadataProcessor(cfg common.Config) (processors.Processor, error) { | ||
return buildDockerMetadataProcessor(cfg, NewWatcher) | ||
} | ||
|
||
func buildDockerMetadataProcessor(cfg common.Config, watcherConstructor WatcherConstructor) (processors.Processor, error) { | ||
logp.Beta("The add_docker_metadata processor is beta") | ||
|
||
config := defaultConfig() | ||
|
||
err := cfg.Unpack(&config) | ||
if err != nil { | ||
return nil, fmt.Errorf("fail to unpack the add_docker_metadata configuration: %s", err) | ||
} | ||
|
||
watcher, err := watcherConstructor(config.Host, config.TLS) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if err = watcher.Start(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return &addDockerMetadata{ | ||
watcher: watcher, | ||
fields: config.Fields, | ||
}, nil | ||
} | ||
|
||
func (d *addDockerMetadata) Run(event common.MapStr) (common.MapStr, error) { | ||
var cid string | ||
for _, field := range d.fields { | ||
value, err := event.GetValue(field) | ||
if err != nil { | ||
continue | ||
} | ||
|
||
if strValue, ok := value.(string); ok { | ||
cid = strValue | ||
} | ||
} | ||
|
||
if cid == "" { | ||
return event, nil | ||
} | ||
|
||
container := d.watcher.Container(cid) | ||
if container != nil { | ||
meta := common.MapStr{} | ||
metaIface, ok := event["docker"] | ||
if ok { | ||
meta = metaIface.(common.MapStr) | ||
} | ||
|
||
if len(container.Labels) > 0 { | ||
labels := common.MapStr{} | ||
for k, v := range container.Labels { | ||
labels.Put(k, v) | ||
} | ||
meta.Put("container.labels", labels) | ||
} | ||
|
||
meta.Put("container.id", container.ID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this matches the "fields" with have in metricbeat for the docker containers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, they do |
||
meta.Put("container.image", container.Image) | ||
meta.Put("container.name", container.Name) | ||
event["docker"] = meta | ||
} else { | ||
logp.Debug("docker", "Container not found: %s", cid) | ||
} | ||
|
||
return event, nil | ||
} | ||
|
||
func (d *addDockerMetadata) String() string { | ||
return "add_docker_metadata=[fields=" + strings.Join(d.fields, ", ") + "]" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package add_docker_metadata | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestInitialization(t *testing.T) { | ||
var testConfig = common.NewConfig() | ||
|
||
p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil)) | ||
assert.NoError(t, err, "initializing add_docker_metadata processor") | ||
|
||
input := common.MapStr{} | ||
result, err := p.Run(input) | ||
assert.NoError(t, err, "processing an event") | ||
|
||
assert.Equal(t, common.MapStr{}, result) | ||
} | ||
|
||
func TestNoMatch(t *testing.T) { | ||
testConfig, err := common.NewConfigFrom(map[string]interface{}{ | ||
"match_fields": []string{"foo"}, | ||
}) | ||
assert.NoError(t, err) | ||
|
||
p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil)) | ||
assert.NoError(t, err, "initializing add_docker_metadata processor") | ||
|
||
input := common.MapStr{ | ||
"field": "value", | ||
} | ||
result, err := p.Run(input) | ||
assert.NoError(t, err, "processing an event") | ||
|
||
assert.Equal(t, common.MapStr{"field": "value"}, result) | ||
} | ||
|
||
func TestMatchNoContainer(t *testing.T) { | ||
testConfig, err := common.NewConfigFrom(map[string]interface{}{ | ||
"match_fields": []string{"foo"}, | ||
}) | ||
assert.NoError(t, err) | ||
|
||
p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory(nil)) | ||
assert.NoError(t, err, "initializing add_docker_metadata processor") | ||
|
||
input := common.MapStr{ | ||
"foo": "garbage", | ||
} | ||
result, err := p.Run(input) | ||
assert.NoError(t, err, "processing an event") | ||
|
||
assert.Equal(t, common.MapStr{"foo": "garbage"}, result) | ||
} | ||
|
||
func TestMatchContainer(t *testing.T) { | ||
testConfig, err := common.NewConfigFrom(map[string]interface{}{ | ||
"match_fields": []string{"foo"}, | ||
}) | ||
assert.NoError(t, err) | ||
|
||
p, err := buildDockerMetadataProcessor(*testConfig, MockWatcherFactory( | ||
map[string]*Container{ | ||
"container_id": &Container{ | ||
ID: "container_id", | ||
Image: "image", | ||
Name: "name", | ||
Labels: map[string]string{ | ||
"a": "1", | ||
"b": "2", | ||
}, | ||
}, | ||
})) | ||
assert.NoError(t, err, "initializing add_docker_metadata processor") | ||
|
||
input := common.MapStr{ | ||
"foo": "container_id", | ||
} | ||
result, err := p.Run(input) | ||
assert.NoError(t, err, "processing an event") | ||
|
||
assert.EqualValues(t, common.MapStr{ | ||
"docker": common.MapStr{ | ||
"container": common.MapStr{ | ||
"id": "container_id", | ||
"image": "image", | ||
"labels": common.MapStr{ | ||
"a": "1", | ||
"b": "2", | ||
}, | ||
"name": "name", | ||
}, | ||
}, | ||
"foo": "container_id", | ||
}, result) | ||
} | ||
|
||
// Mock container watcher | ||
|
||
func MockWatcherFactory(containers map[string]*Container) WatcherConstructor { | ||
if containers == nil { | ||
containers = make(map[string]*Container) | ||
} | ||
return func(host string, tls *TLSConfig) (Watcher, error) { | ||
return &mockWatcher{containers: containers}, nil | ||
} | ||
} | ||
|
||
type mockWatcher struct { | ||
containers map[string]*Container | ||
} | ||
|
||
func (m *mockWatcher) Start() error { | ||
return nil | ||
} | ||
|
||
func (m *mockWatcher) Container(ID string) *Container { | ||
return m.containers[ID] | ||
} | ||
|
||
func (m *mockWatcher) Containers() map[string]*Container { | ||
return m.containers | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package add_docker_metadata | ||
|
||
// Config for docker processor | ||
type Config struct { | ||
Host string `config:"host"` | ||
TLS *TLSConfig `config:"ssl"` | ||
Fields []string `config:"match_fields"` | ||
} | ||
|
||
// TLSConfig for docker socket connection | ||
type TLSConfig struct { | ||
CA string `config:"certificate_authority"` | ||
Certificate string `config:"certificate"` | ||
Key string `config:"key"` | ||
} | ||
|
||
func defaultConfig() Config { | ||
return Config{ | ||
Host: "unix:///var/run/docker.sock", | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was first confused by this and had to read the code to understand what it does. It takes this field and checks if the value of this field matches to any container id. If yes, it will add the container meta data to the event.
In the code, it can currently only match 1 field I think, but here it is plural and you use an array. How is this going to work exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW I'm not too sure about the example used here. In case someone uses cgroups, doesn't that kind of mean he doesn't want to use docker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It checks all the fields until one of them matches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for the example, you may want to have docker info on top of the cgroup id, so you can filter by, for instance, docker image
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the config option is kind of
field_that_matches_container_id: ...
. What is the use case of having multiple fields defined?For the cgroup docker part: I see that this is what it's doing, but why would someone then not just use the docker module instead as he will rely on the docker api.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have several metrics matching cgroup id in system (maybe something we have to look into): system.process.cgroup.cpu.id, system.process.cgroup.memory.id, system.process.cgroup.blkio.id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I assume per config you only want to match to one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to match all of them, perhaps in the future it would be better to merge those fields into a global
system.cgroup.id
, with different meanings depending on the metricset, but as for now I would need to match any possible field there.what do you think @andrewkroh, does this make any sense?