Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revive fixes - part 2 #8835

Merged
merged 8 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions agent/tick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/stretchr/testify/require"
)

var format = "2006-01-02T15:04:05.999Z07:00"

func TestAlignedTicker(t *testing.T) {
interval := 10 * time.Second
jitter := 0 * time.Second
Expand All @@ -32,7 +30,7 @@ func TestAlignedTicker(t *testing.T) {
time.Unix(60, 0).UTC(),
}

actual := []time.Time{}
var actual []time.Time
zak-pawel marked this conversation as resolved.
Show resolved Hide resolved

clock.Add(10 * time.Second)
for !clock.Now().After(until) {
Expand Down Expand Up @@ -112,7 +110,7 @@ func TestUnalignedTicker(t *testing.T) {
time.Unix(61, 0).UTC(),
}

actual := []time.Time{}
var actual []time.Time
for !clock.Now().After(until) {
select {
case tm := <-ticker.Elapsed():
Expand Down Expand Up @@ -147,7 +145,7 @@ func TestRollingTicker(t *testing.T) {
time.Unix(61, 0).UTC(),
}

actual := []time.Time{}
var actual []time.Time
for !clock.Now().After(until) {
select {
case tm := <-ticker.Elapsed():
Expand Down Expand Up @@ -249,7 +247,7 @@ func simulatedDist(ticker Ticker, clock *clock.Mock) Distribution {
for !clock.Now().After(until) {
select {
case tm := <-ticker.Elapsed():
dist.Buckets[tm.Second()] += 1
dist.Buckets[tm.Second()]++
dist.Count++
dist.Waittime += tm.Sub(last).Seconds()
last = tm
Expand Down
11 changes: 2 additions & 9 deletions internal/rotate/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ func (w *FileWriter) openCurrent() (err error) {
w.bytesWritten = fileInfo.Size()
}

if err = w.rotateIfNeeded(); err != nil {
return err
}
return nil
return w.rotateIfNeeded()
}

func (w *FileWriter) rotateIfNeeded() error {
Expand All @@ -153,11 +150,7 @@ func (w *FileWriter) rotate() (err error) {
return err
}

if err = w.purgeArchivesIfNeeded(); err != nil {
return err
}

return nil
return w.purgeArchivesIfNeeded()
}

func (w *FileWriter) purgeArchivesIfNeeded() (err error) {
Expand Down
20 changes: 10 additions & 10 deletions internal/snmp/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@ type GosnmpWrapper struct {
}

// Host returns the value of GoSNMP.Target.
func (gsw GosnmpWrapper) Host() string {
return gsw.Target
func (gs GosnmpWrapper) Host() string {
return gs.Target
}

// Walk wraps GoSNMP.Walk() or GoSNMP.BulkWalk(), depending on whether the
// connection is using SNMPv1 or newer.
// Also, if any error is encountered, it will just once reconnect and try again.
func (gsw GosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error {
func (gs GosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error {
var err error
// On error, retry once.
// Unfortunately we can't distinguish between an error returned by gosnmp, and one returned by the walk function.
for i := 0; i < 2; i++ {
if gsw.Version == gosnmp.Version1 {
err = gsw.GoSNMP.Walk(oid, fn)
if gs.Version == gosnmp.Version1 {
err = gs.GoSNMP.Walk(oid, fn)
} else {
err = gsw.GoSNMP.BulkWalk(oid, fn)
err = gs.GoSNMP.BulkWalk(oid, fn)
}
if err == nil {
return nil
}
if err := gsw.GoSNMP.Connect(); err != nil {
if err := gs.GoSNMP.Connect(); err != nil {
return fmt.Errorf("reconnecting: %w", err)
}
}
Expand All @@ -44,15 +44,15 @@ func (gsw GosnmpWrapper) Walk(oid string, fn gosnmp.WalkFunc) error {

// Get wraps GoSNMP.GET().
// If any error is encountered, it will just once reconnect and try again.
func (gsw GosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) {
func (gs GosnmpWrapper) Get(oids []string) (*gosnmp.SnmpPacket, error) {
var err error
var pkt *gosnmp.SnmpPacket
for i := 0; i < 2; i++ {
pkt, err = gsw.GoSNMP.Get(oids)
pkt, err = gs.GoSNMP.Get(oids)
if err == nil {
return pkt, nil
}
if err := gsw.GoSNMP.Connect(); err != nil {
if err := gs.GoSNMP.Connect(); err != nil {
return nil, fmt.Errorf("reconnecting: %w", err)
}
}
Expand Down
50 changes: 25 additions & 25 deletions models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

const (
// Default size of metrics batch size.
DEFAULT_METRIC_BATCH_SIZE = 1000
DefaultMetricBatchSize = 1000

// Default number of metrics kept. It should be a multiple of batch size.
DEFAULT_METRIC_BUFFER_LIMIT = 10000
DefaultMetricBufferLimit = 10000
)

// OutputConfig containing name and filter
Expand Down Expand Up @@ -78,13 +78,13 @@ func NewRunningOutput(
bufferLimit = config.MetricBufferLimit
}
if bufferLimit == 0 {
bufferLimit = DEFAULT_METRIC_BUFFER_LIMIT
bufferLimit = DefaultMetricBufferLimit
}
if config.MetricBatchSize > 0 {
batchSize = config.MetricBatchSize
}
if batchSize == 0 {
batchSize = DEFAULT_METRIC_BATCH_SIZE
batchSize = DefaultMetricBatchSize
}

ro := &RunningOutput{
Expand All @@ -110,17 +110,17 @@ func NewRunningOutput(
return ro
}

func (r *RunningOutput) LogName() string {
return logName("outputs", r.Config.Name, r.Config.Alias)
func (ro *RunningOutput) LogName() string {
return logName("outputs", ro.Config.Name, ro.Config.Alias)
zak-pawel marked this conversation as resolved.
Show resolved Hide resolved
}

func (ro *RunningOutput) metricFiltered(metric telegraf.Metric) {
ro.MetricsFiltered.Incr(1)
metric.Drop()
}

func (r *RunningOutput) Init() error {
if p, ok := r.Output.(telegraf.Initializer); ok {
func (ro *RunningOutput) Init() error {
if p, ok := ro.Output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
Expand Down Expand Up @@ -228,40 +228,40 @@ func (ro *RunningOutput) WriteBatch() error {
}

// Close closes the output
func (r *RunningOutput) Close() {
err := r.Output.Close()
func (ro *RunningOutput) Close() {
err := ro.Output.Close()
if err != nil {
r.log.Errorf("Error closing output: %v", err)
ro.log.Errorf("Error closing output: %v", err)
}
}

func (r *RunningOutput) write(metrics []telegraf.Metric) error {
dropped := atomic.LoadInt64(&r.droppedMetrics)
func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
dropped := atomic.LoadInt64(&ro.droppedMetrics)
if dropped > 0 {
r.log.Warnf("Metric buffer overflow; %d metrics have been dropped", dropped)
atomic.StoreInt64(&r.droppedMetrics, 0)
ro.log.Warnf("Metric buffer overflow; %d metrics have been dropped", dropped)
atomic.StoreInt64(&ro.droppedMetrics, 0)
}

start := time.Now()
err := r.Output.Write(metrics)
err := ro.Output.Write(metrics)
elapsed := time.Since(start)
r.WriteTime.Incr(elapsed.Nanoseconds())
ro.WriteTime.Incr(elapsed.Nanoseconds())

if err == nil {
r.log.Debugf("Wrote batch of %d metrics in %s", len(metrics), elapsed)
ro.log.Debugf("Wrote batch of %d metrics in %s", len(metrics), elapsed)
}
return err
}

func (r *RunningOutput) LogBufferStatus() {
nBuffer := r.buffer.Len()
r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit)
func (ro *RunningOutput) LogBufferStatus() {
nBuffer := ro.buffer.Len()
ro.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, ro.MetricBufferLimit)
}

func (r *RunningOutput) Log() telegraf.Logger {
return r.log
func (ro *RunningOutput) Log() telegraf.Logger {
return ro.log
}

func (r *RunningOutput) BufferLength() int {
return r.buffer.Len()
func (ro *RunningOutput) BufferLength() int {
return ro.buffer.Len()
}
32 changes: 16 additions & 16 deletions models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (rp *RunningProcessor) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}

func (r *RunningProcessor) Init() error {
if p, ok := r.Processor.(telegraf.Initializer); ok {
func (rp *RunningProcessor) Init() error {
if p, ok := rp.Processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
Expand All @@ -62,39 +62,39 @@ func (r *RunningProcessor) Init() error {
return nil
}

func (r *RunningProcessor) Log() telegraf.Logger {
return r.log
func (rp *RunningProcessor) Log() telegraf.Logger {
return rp.log
}

func (r *RunningProcessor) LogName() string {
return logName("processors", r.Config.Name, r.Config.Alias)
func (rp *RunningProcessor) LogName() string {
return logName("processors", rp.Config.Name, rp.Config.Alias)
}

func (r *RunningProcessor) MakeMetric(metric telegraf.Metric) telegraf.Metric {
func (rp *RunningProcessor) MakeMetric(metric telegraf.Metric) telegraf.Metric {
return metric
}

func (r *RunningProcessor) Start(acc telegraf.Accumulator) error {
return r.Processor.Start(acc)
func (rp *RunningProcessor) Start(acc telegraf.Accumulator) error {
return rp.Processor.Start(acc)
}

func (r *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error {
if ok := r.Config.Filter.Select(m); !ok {
func (rp *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error {
if ok := rp.Config.Filter.Select(m); !ok {
// pass downstream
acc.AddMetric(m)
return nil
}

r.Config.Filter.Modify(m)
rp.Config.Filter.Modify(m)
if len(m.FieldList()) == 0 {
// drop metric
r.metricFiltered(m)
rp.metricFiltered(m)
return nil
}

return r.Processor.Add(m, acc)
return rp.Processor.Add(m, acc)
}

func (r *RunningProcessor) Stop() {
r.Processor.Stop()
func (rp *RunningProcessor) Stop() {
rp.Processor.Stop()
}
6 changes: 1 addition & 5 deletions plugins/common/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,5 @@ func (k *Config) SetConfig(config *sarama.Config) error {
config.Net.TLS.Enable = true
}

if err := k.SetSASLConfig(config); err != nil {
return err
}

return nil
return k.SetSASLConfig(config)
}
18 changes: 9 additions & 9 deletions plugins/common/shim/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,28 @@ type loadedConfig struct {

// LoadConfig Adds plugins to the shim
func (s *Shim) LoadConfig(filePath *string) error {
conf, err := LoadConfig(filePath)
conf, err := loadConfig(filePath)
if err != nil {
return err
}
if conf.Input != nil {
if err = s.AddInput(conf.Input); err != nil {
return fmt.Errorf("Failed to add Input: %w", err)
return fmt.Errorf("failed to add Input: %w", err)
}
} else if conf.Processor != nil {
if err = s.AddStreamingProcessor(conf.Processor); err != nil {
return fmt.Errorf("Failed to add Processor: %w", err)
return fmt.Errorf("failed to add Processor: %w", err)
}
} else if conf.Output != nil {
if err = s.AddOutput(conf.Output); err != nil {
return fmt.Errorf("Failed to add Output: %w", err)
return fmt.Errorf("failed to add Output: %w", err)
}
}
return nil
}

// LoadConfig loads the config and returns inputs that later need to be loaded.
func LoadConfig(filePath *string) (loaded loadedConfig, err error) {
// loadConfig loads the config and returns inputs that later need to be loaded.
func loadConfig(filePath *string) (loaded loadedConfig, err error) {
zak-pawel marked this conversation as resolved.
Show resolved Hide resolved
var data string
conf := config{}
if filePath != nil && *filePath != "" {
Expand All @@ -62,7 +62,7 @@ func LoadConfig(filePath *string) (loaded loadedConfig, err error) {
data = expandEnvVars(b)

} else {
conf, err = DefaultImportedPlugins()
conf, err = defaultImportedPlugins()
if err != nil {
return loadedConfig{}, err
}
Expand Down Expand Up @@ -147,10 +147,10 @@ func createPluginsWithTomlConfig(md toml.MetaData, conf config) (loadedConfig, e
return loadedConf, nil
}

// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and
// defaultImportedPlugins defaults to whatever plugins happen to be loaded and
// have registered themselves with the registry. This makes loading plugins
// without having to define a config dead easy.
func DefaultImportedPlugins() (config, error) {
func defaultImportedPlugins() (config, error) {
conf := config{
Inputs: map[string][]toml.Primitive{},
Processors: map[string][]toml.Primitive{},
Expand Down
Loading