diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index b1fa3637bd1db..f54b5266e0369 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -5,6 +5,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/selfstat" ) @@ -96,39 +97,37 @@ func (r *RunningAggregator) MakeMetric(metric telegraf.Metric) telegraf.Metric { return m } -func (r *RunningAggregator) metricFiltered(metric telegraf.Metric) { - r.MetricsFiltered.Incr(1) - metric.Accept() -} - func (r *RunningAggregator) metricDropped(metric telegraf.Metric) { r.MetricsDropped.Incr(1) - metric.Accept() } // Add a metric to the aggregator and return true if the original metric // should be dropped. -func (r *RunningAggregator) Add(metric telegraf.Metric) bool { - if ok := r.Config.Filter.Select(metric); !ok { +func (r *RunningAggregator) Add(m telegraf.Metric) bool { + if ok := r.Config.Filter.Select(m); !ok { return false } - metric = metric.Copy() + // Make a copy of the metric but don't retain tracking; it doesn't make + // sense to fail a metric's delivery due to the aggregation not being + // sent because we can't create aggregations of historical data. + m = metric.FromMetric(m) - r.Config.Filter.Modify(metric) - if len(metric.FieldList()) == 0 { + r.Config.Filter.Modify(m) + if len(m.FieldList()) == 0 { + r.metricDropped(m) return r.Config.DropOriginal } r.Lock() defer r.Unlock() - if r.periodStart.IsZero() || metric.Time().After(r.periodEnd) { - r.metricDropped(metric) + if r.periodStart.IsZero() || m.Time().After(r.periodEnd) { + r.metricDropped(m) return r.Config.DropOriginal } - r.Aggregator.Add(metric) + r.Aggregator.Add(m) return r.Config.DropOriginal } diff --git a/metric/metric.go b/metric/metric.go index f2a49957e71e9..de4af500b0b76 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -62,6 +62,28 @@ func New( return m, nil } +// FromMetric returns a deep copy of the metric with any tracking information +// removed. +func FromMetric(other telegraf.Metric) telegraf.Metric { + m := &metric{ + name: other.Name(), + tags: make([]*telegraf.Tag, len(other.TagList())), + fields: make([]*telegraf.Field, len(other.FieldList())), + tm: other.Time(), + tp: other.Type(), + aggregate: other.IsAggregate(), + } + + for i, tag := range other.TagList() { + m.tags[i] = &telegraf.Tag{Key: tag.Key, Value: tag.Value} + } + + for i, field := range other.FieldList() { + m.fields[i] = &telegraf.Field{Key: field.Key, Value: field.Value} + } + return m +} + func (m *metric) String() string { return fmt.Sprintf("%s %v %v %d", m.name, m.Tags(), m.Fields(), m.tm.UnixNano()) }