Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processors.starlark): Allow persistence of global state #15170

Merged
merged 2 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,6 @@ func (a *Agent) Run(ctx context.Context) error {
time.Duration(a.Config.Agent.Interval), a.Config.Agent.Quiet,
a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval))

log.Printf("D! [agent] Initializing plugins")
if err := a.initPlugins(); err != nil {
return err
}

if a.Config.Persister != nil {
log.Printf("D! [agent] Initializing plugin states")
if err := a.initPersister(); err != nil {
Expand All @@ -124,6 +119,11 @@ func (a *Agent) Run(ctx context.Context) error {
}
}

log.Printf("D! [agent] Initializing plugins")
if err := a.initPlugins(); err != nil {
return err
}

startTime := time.Now()

log.Printf("D! [agent] Connecting outputs")
Expand Down
109 changes: 102 additions & 7 deletions plugins/common/starlark/starlark.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package starlark

import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"strings"
Expand All @@ -26,6 +28,77 @@ type Common struct {
globals starlark.StringDict
functions map[string]*starlark.Function
parameters map[string]starlark.Tuple
state *starlark.Dict
}

func (s *Common) GetState() interface{} {
// Return the actual byte-type instead of nil allowing the persister
// to guess instantiate variable of the appropriate type
if s.state == nil {
return []byte{}
}

// Convert the starlark dict into a golang dictionary for serialization
state := make(map[string]interface{}, s.state.Len())
items := s.state.Items()
for _, item := range items {
if len(item) != 2 {
// We do expect key-value pairs in the state so there should be
// two items.
s.Log.Errorf("state item %+v does not contain a key-value pair", item)
continue
}
k, ok := item.Index(0).(starlark.String)
if !ok {
s.Log.Errorf("state item %+v has invalid key type %T", item, item.Index(0))
continue
}
v, err := asGoValue(item.Index(1))
if err != nil {
s.Log.Errorf("state item %+v value cannot be converted: %v", item, err)
continue
}
state[k.GoString()] = v
}

// Do a binary GOB encoding to preserve types
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(state); err != nil {
s.Log.Errorf("encoding state failed: %v", err)
return []byte{}
}

return buf.Bytes()
}

func (s *Common) SetState(state interface{}) error {
data, ok := state.([]byte)
if !ok {
return fmt.Errorf("unexpected type %T for state", state)
}
if len(data) == 0 {
return nil
}

// Decode the binary GOB encoding
var dict map[string]interface{}
if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(&dict); err != nil {
return fmt.Errorf("decoding state failed: %w", err)
}

// Convert the golang dict back to starlark types
s.state = starlark.NewDict(len(dict))
for k, v := range dict {
sv, err := asStarlarkValue(v)
if err != nil {
return fmt.Errorf("value %v of state item %q cannot be set: %w", v, k, err)
}
if err := s.state.SetKey(starlark.String(k), sv); err != nil {
return fmt.Errorf("state item %q cannot be set: %w", k, err)
}
}

return nil
}

func (s *Common) Init() error {
Expand All @@ -47,27 +120,46 @@ func (s *Common) Init() error {
builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
builtins["catch"] = starlark.NewBuiltin("catch", catch)
err := s.addConstants(&builtins)
if err != nil {

if err := s.addConstants(&builtins); err != nil {
return err
}

// Insert the persisted state if any
if s.state != nil {
builtins["state"] = s.state
}

// Load the program. In case of an error we can try to insert the state
// which can be used implicitly e.g. when persisting states
program, err := s.sourceProgram(builtins)
if err != nil {
return err
// Try again with a declared state. This might be necessary for
// state persistence.
s.state = starlark.NewDict(0)
builtins["state"] = s.state
p, serr := s.sourceProgram(builtins)
if serr != nil {
return err
}
program = p
}

// Execute source
globals, err := program.Init(s.thread, builtins)
if err != nil {
return err
}
// Make available a shared state to the apply function
globals["state"] = starlark.NewDict(0)

// Freeze the global state. This prevents modifications to the processor
// In case the program declares a global "state" we should insert it to
// avoid warnings about inserting into a frozen variable
if _, found := globals["state"]; found {
globals["state"] = starlark.NewDict(0)
}

// Freeze the global state. This prevents modifications to the processor
// state and prevents scripts from containing errors storing tracking
// metrics. Tasks that require global state will not be possible due to
// metrics. Tasks that require global state will not be possible due to
// this, so maybe we should relax this in the future.
globals.Freeze()

Expand Down Expand Up @@ -107,6 +199,9 @@ func (s *Common) AddFunction(name string, params ...starlark.Value) error {
// Add all the constants defined in the plugin as constants of the script
func (s *Common) addConstants(builtins *starlark.StringDict) error {
for key, val := range s.Constants {
if key == "state" {
return errors.New("'state' constant uses reserved name")
}
sVal, err := asStarlarkValue(val)
if err != nil {
return fmt.Errorf("converting type %T failed: %w", val, err)
Expand Down
7 changes: 3 additions & 4 deletions plugins/processors/starlark/starlark.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ func (*Starlark) SampleConfig() string {
}

func (s *Starlark) Init() error {
err := s.Common.Init()
if err != nil {
if err := s.Common.Init(); err != nil {
return err
}

// The source should define an apply function.
err = s.AddFunction("apply", &common.Metric{})
if err != nil {
if err := s.AddFunction("apply", &common.Metric{}); err != nil {
return err
}

Expand Down Expand Up @@ -118,6 +116,7 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
default:
return fmt.Errorf("invalid type returned: %T", rv)
}

return nil
}

Expand Down
Loading
Loading