Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support of aggregator as Starlark script #9419

Merged
merged 8 commits into from
Nov 18, 2021
Merged
1 change: 1 addition & 0 deletions plugins/aggregators/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/aggregators/merge"
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
_ "github.com/influxdata/telegraf/plugins/aggregators/quantile"
_ "github.com/influxdata/telegraf/plugins/aggregators/starlark"
_ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter"
)
103 changes: 103 additions & 0 deletions plugins/aggregators/starlark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Starlark Aggregator

The `starlark` aggregator allows to implement a custom aggregator plugin with a Starlark script. The Starlark
script needs to be composed of the three methods defined in the Aggregator plugin interface which are `add`, `push` and `reset`.

The Starlark Aggregator plugin calls the Starlark function `add` to add the metrics to the aggregator, then calls the Starlark function `push` to push the resulting metrics into the accumulator and finally calls the Starlark function `reset` to reset the entire state of the plugin.

The Starlark functions can use the global function `state` to keep temporary the metrics to aggregate.

The Starlark language is a dialect of Python, and will be familiar to those who
have experience with the Python language. However, there are major [differences](#python-differences).
Existing Python code is unlikely to work unmodified. The execution environment
is sandboxed, and it is not possible to do I/O operations such as reading from
files or sockets.

The **[Starlark specification][]** has details about the syntax and available
functions.

## Configuration

```toml
[[aggregators.starlark]]
## The Starlark source can be set as a string in this configuration file, or
## by referencing a file containing the script. Only one source or script
## should be set at once.
##
## Source of the Starlark script.
source = '''
state = {}

def add(metric):
state["last"] = metric

def push():
return state.get("last")

def reset():
state.clear()
'''

## File containing a Starlark script.
# script = "/usr/local/bin/myscript.star"

## The constants of the Starlark script.
# [aggregators.starlark.constants]
# max_size = 10
# threshold = 0.75
# default_name = "Julia"
# debug_mode = true
```

## Usage

The Starlark code should contain a function called `add` that takes a metric as argument.
The function will be called with each metric to add, and doesn't return anything.

```python
def add(metric):
state["last"] = metric
```

The Starlark code should also contain a function called `push` that doesn't take any argument.
The function will be called to compute the aggregation, and returns the metrics to push to the accumulator.

```python
def push():
return state.get("last")
```

The Starlark code should also contain a function called `reset` that doesn't take any argument.
The function will be called to reset the plugin, and doesn't return anything.

```python
def push():
state.clear()
```

For a list of available types and functions that can be used in the code, see
the [Starlark specification][].

## Python Differences

Refer to the section [Python Differences](plugins/processors/starlark/README.md#python-differences) of the documentation about the Starlark processor.

## Libraries available

Refer to the section [Libraries available](plugins/processors/starlark/README.md#libraries-available) of the documentation about the Starlark processor.

## Common Questions

Refer to the section [Common Questions](plugins/processors/starlark/README.md#common-questions) of the documentation about the Starlark processor.

## Examples

- [minmax](/plugins/aggregators/starlark/testdata/min_max.star) - A minmax aggregator implemented with a Starlark script.
- [merge](/plugins/aggregators/starlark/testdata/merge.star) - A merge aggregator implemented with a Starlark script.

[All examples](/plugins/aggregators/starlark/testdata) are in the testdata folder.

Open a Pull Request to add any other useful Starlark examples.

[Starlark specification]: https://github.com/google/starlark-go/blob/master/doc/spec.md
[dict]: https://github.com/google/starlark-go/blob/master/doc/spec.md#dictionaries
143 changes: 143 additions & 0 deletions plugins/aggregators/starlark/starlark.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package starlark //nolint - Needed to avoid getting import-shadowing: The name 'starlark' shadows an import name (revive)

import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
common "github.com/influxdata/telegraf/plugins/common/starlark"
"go.starlark.net/starlark"
)

const (
description = "Aggregate metrics using a Starlark script"
sampleConfig = `
## The Starlark source can be set as a string in this configuration file, or
## by referencing a file containing the script. Only one source or script
## should be set at once.
##
## Source of the Starlark script.
source = '''
state = {}

def add(metric):
state["last"] = metric

def push():
return state.get("last")

def reset():
state.clear()
'''
srebhan marked this conversation as resolved.
Show resolved Hide resolved

## File containing a Starlark script.
# script = "/usr/local/bin/myscript.star"

## The constants of the Starlark script.
# [aggregators.starlark.constants]
# max_size = 10
# threshold = 0.75
# default_name = "Julia"
# debug_mode = true
`
)

type Starlark struct {
common.StarlarkCommon
}

func (s *Starlark) Init() error {
// Execute source
err := s.StarlarkCommon.Init()
if err != nil {
return err
}

// The source should define an add function.
err = s.AddFunction("add", &common.Metric{})
if err != nil {
return err
}

// The source should define a push function.
err = s.AddFunction("push")
if err != nil {
return err
}

// The source should define a reset function.
err = s.AddFunction("reset")
if err != nil {
return err
}

return nil
}

func (s *Starlark) SampleConfig() string {
return sampleConfig
}

func (s *Starlark) Description() string {
return description
}

func (s *Starlark) Add(metric telegraf.Metric) {
parameters, found := s.GetParameters("add")
if !found {
s.Log.Errorf("The parameters of the add function could not be found")
return
}
parameters[0].(*common.Metric).Wrap(metric)

_, err := s.Call("add")
if err != nil {
s.LogError(err)
}
}

func (s *Starlark) Push(acc telegraf.Accumulator) {
rv, err := s.Call("push")
if err != nil {
s.LogError(err)
essobedo marked this conversation as resolved.
Show resolved Hide resolved
return
}

switch rv := rv.(type) {
case *starlark.List:
iter := rv.Iterate()
defer iter.Done()
var v starlark.Value
for iter.Next(&v) {
switch v := v.(type) {
case *common.Metric:
m := v.Unwrap()
acc.AddMetric(m)
default:
s.Log.Errorf("Invalid type returned in list: %s", v.Type())
}
}
case *common.Metric:
m := rv.Unwrap()
acc.AddMetric(m)
case starlark.NoneType:
default:
s.Log.Errorf("Invalid type returned: %T", rv)
}
srebhan marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *Starlark) Reset() {
_, err := s.Call("reset")
if err != nil {
s.LogError(err)
}
}

// init initializes starlark aggregator plugin
func init() {
aggregators.Add("starlark", func() telegraf.Aggregator {
return &Starlark{
StarlarkCommon: common.StarlarkCommon{
StarlarkLoadFunc: common.LoadFunc,
},
}
})
}
Loading