Skip to content

Commit

Permalink
Processor & Aggregator Contrib doc
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Oct 12, 2016
1 parent acfdd15 commit cace663
Showing 1 changed file with 185 additions and 5 deletions.
190 changes: 185 additions & 5 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

1. [Sign the CLA](http://influxdb.com/community/cla.html)
1. Make changes or write plugin (see below for details)
1. Add your plugin to `plugins/inputs/all/all.go` or `plugins/outputs/all/all.go`
1. Add your plugin to one of: `plugins/{inputs,outputs,aggregators,processors}/all/all.go`
1. If your plugin requires a new Go package,
[add it](https://github.com/influxdata/telegraf/blob/master/CONTRIBUTING.md#adding-a-dependency)
1. Write a README for your plugin, if it's an input plugin, it should be structured
Expand All @@ -16,8 +16,8 @@ for a good example.

## GoDoc

Public interfaces for inputs, outputs, metrics, and the accumulator can be found
on the GoDoc
Public interfaces for inputs, outputs, processors, aggregators, metrics,
and the accumulator can be found on the GoDoc

[![GoDoc](https://godoc.org/github.com/influxdata/telegraf?status.svg)](https://godoc.org/github.com/influxdata/telegraf)

Expand Down Expand Up @@ -46,7 +46,7 @@ and submit new inputs.

### Input Plugin Guidelines

* A plugin must conform to the `telegraf.Input` interface.
* A plugin must conform to the [`telegraf.Input`](https://godoc.org/github.com/influxdata/telegraf#Input) interface.
* Input Plugins should call `inputs.Add` in their `init` function to register themselves.
See below for a quick example.
* Input Plugins must be added to the
Expand Down Expand Up @@ -177,7 +177,7 @@ similar constructs.

### Output Plugin Guidelines

* An output must conform to the `outputs.Output` interface.
* An output must conform to the [`telegraf.Output`](https://godoc.org/github.com/influxdata/telegraf#Output) interface.
* Outputs should call `outputs.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the
Expand Down Expand Up @@ -275,6 +275,186 @@ and `Stop()` methods.
* Same as the `Output` guidelines, except that they must conform to the
`output.ServiceOutput` interface.

## Processor Plugins

This section is for developers who want to create a new processor plugin.

### Processor Plugin Guidelines

* A processor must conform to the [`telegraf.Processor`](https://godoc.org/github.com/influxdata/telegraf#Processor) interface.
* Processors should call `processors.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the
`github.com/influxdata/telegraf/plugins/processors/all/all.go` file.
* The `SampleConfig` function should return valid toml that describes how the
processor can be configured. This is include in `telegraf -sample-config`.
* The `Description` function should say in one line what this processor does.

### Processor Example

```go
package printer

// printer.go

import (
"fmt"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
)

type Printer struct {
}

var sampleConfig = `
`

func (p *Printer) SampleConfig() string {
return sampleConfig
}

func (p *Printer) Description() string {
return "Print all metrics that pass through this filter."
}

func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
fmt.Println(metric.String())
}
return in
}

func init() {
processors.Add("printer", func() telegraf.Processor {
return &Printer{}
})
}
```

## Aggregator Plugins

This section is for developers who want to create a new aggregator plugin.

### Aggregator Plugin Guidelines

* A aggregator must conform to the [`telegraf.Aggregator`](https://godoc.org/github.com/influxdata/telegraf#Aggregator) interface.
* Aggregators should call `aggregators.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the
`github.com/influxdata/telegraf/plugins/aggregators/all/all.go` file.
* The `SampleConfig` function should return valid toml that describes how the
aggregator can be configured. This is include in `telegraf -sample-config`.
* The `Description` function should say in one line what this aggregator does.
* The Aggregator plugin will need to keep caches of metrics that have passed
through it. This should be done using the builtin `HashID()` function of each
metric.
* When the `Reset()` function is called, all caches should be cleared.

### Aggregator Example

```go
package min

// min.go

import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)

type Min struct {
// caches for metric fields, names, and tags
fieldCache map[uint64]map[string]float64
nameCache map[uint64]string
tagCache map[uint64]map[string]string
}

func NewMin() telegraf.Aggregator {
m := &Min{}
m.Reset()
return m
}

var sampleConfig = `
## period is the flush & clear interval of the aggregator.
period = "30s"
## If true drop_original will drop the original metrics and
## only send aggregates.
drop_original = false
`

func (m *Min) SampleConfig() string {
return sampleConfig
}

func (m *Min) Description() string {
return "Keep the aggregate min of each metric passing through."
}

func (m *Min) Add(in telegraf.Metric) {
id := in.HashID()
if _, ok := m.nameCache[id]; !ok {
// hit an uncached metric, create caches for first time:
m.nameCache[id] = in.Name()
m.tagCache[id] = in.Tags()
m.fieldCache[id] = make(map[string]float64)
for k, v := range in.Fields() {
if fv, ok := convert(v); ok {
m.fieldCache[id][k] = fv
}
}
} else {
for k, v := range in.Fields() {
if fv, ok := convert(v); ok {
if _, ok := m.fieldCache[id][k]; !ok {
// hit an uncached field of a cached metric
m.fieldCache[id][k] = fv
continue
}
if fv < m.fieldCache[id][k] {
// set new minimum
m.fieldCache[id][k] = fv
}
}
}
}
}

func (m *Min) Push(acc telegraf.Accumulator) {
for id, _ := range m.nameCache {
fields := map[string]interface{}{}
for k, v := range m.fieldCache[id] {
fields[k+"_min"] = v
}
acc.AddFields(m.nameCache[id], fields, m.tagCache[id])
}
}

func (m *Min) Reset() {
m.fieldCache = make(map[uint64]map[string]float64)
m.nameCache = make(map[uint64]string)
m.tagCache = make(map[uint64]map[string]string)
}

func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
case int64:
return float64(v), true
default:
return 0, false
}
}

func init() {
aggregators.Add("min", func() telegraf.Aggregator {
return NewMin()
})
}
```

## Unit Tests

### Execute short tests
Expand Down

0 comments on commit cace663

Please sign in to comment.