diff --git a/accumulator.go b/accumulator.go index 4991752bb2d94..6d740fb529021 100644 --- a/accumulator.go +++ b/accumulator.go @@ -58,7 +58,11 @@ type Accumulator interface { type TrackingID uint64 type TrackingData interface { + // ID is the TrackingID ID() TrackingID + + // RefCount is the number of tracking metrics still persistent and referencing this tracking ID + RefCount() int32 } // DeliveryInfo provides the results of a delivered metric group. diff --git a/metric/deserialize.go b/metric/deserialize.go index a253406a3b50c..50910643f8e4d 100644 --- a/metric/deserialize.go +++ b/metric/deserialize.go @@ -12,7 +12,6 @@ import ( // storage for tracking data that can't be serialized to disk var ( - // todo need some way to empty this map out when done with a tracking ID. // grouped tracking metrics means that ID->Data association is not one to one, // many metrics could be associated with one tracking ID so we cannot just // clear this every time in FromBytes. @@ -67,11 +66,19 @@ func FromBytes(b []byte) (telegraf.Metric, error) { if sm.TID != 0 { mu.Lock() td := trackingStore[sm.TID] - mu.Unlock() - if td == nil { + mu.Unlock() return nil, ErrSkipTracking } + rc := td.RefCount() + if rc <= 1 { + // only 1 metric left referencing this tracking ID, we can remove here since no subsequent metrics + // read can use this ID. If another metric in a metric group with this ID gets added later, it will + // simply be added back into the tracking store again. + trackingStore[sm.TID] = nil + } + mu.Unlock() + m = rebuildTrackingMetric(m, td) } return m, nil diff --git a/metric/tracking.go b/metric/tracking.go index aa37360e62816..954782ddfcca0 100644 --- a/metric/tracking.go +++ b/metric/tracking.go @@ -45,6 +45,10 @@ func (d *trackingData) incr() { atomic.AddInt32(&d.Rc, 1) } +func (d *trackingData) RefCount() int32 { + return d.Rc +} + func (d *trackingData) decr() int32 { return atomic.AddInt32(&d.Rc, -1) } diff --git a/models/buffer_disk.go b/models/buffer_disk.go index ef05791618247..be76019088b43 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -33,11 +33,15 @@ func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, er if err != nil { return nil, fmt.Errorf("failed to open wal file: %w", err) } - return &DiskBuffer{ + buf := &DiskBuffer{ BufferStats: stats, file: walFile, path: filePath, - }, nil + } + if buf.length() > 0 { + buf.originalEnd = buf.writeIndex() + } + return buf, nil } func (b *DiskBuffer) Len() int { @@ -78,16 +82,15 @@ func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int { dropped := 0 for _, m := range metrics { - if !b.addSingle(m) { + if !b.addSingleMetric(m) { dropped++ } } b.BufferSize.Set(int64(b.length())) return dropped - // todo implement batched writes } -func (b *DiskBuffer) addSingle(m telegraf.Metric) bool { +func (b *DiskBuffer) addSingleMetric(m telegraf.Metric) bool { data, err := metric.ToBytes(m) if err != nil { panic(err) @@ -100,26 +103,6 @@ func (b *DiskBuffer) addSingle(m telegraf.Metric) bool { return false } -//nolint:unused // to be implemented in the future -func (b *DiskBuffer) addBatch(metrics []telegraf.Metric) int { - written := 0 - batch := new(wal.Batch) - for _, m := range metrics { - data, err := metric.ToBytes(m) - if err != nil { - panic(err) - } - batch.Write(b.writeIndex(), data) - b.metricAdded() - written++ - } - err := b.file.WriteBatch(batch) - if err != nil { - return 0 // todo error handle, test if a partial write occur - } - return written -} - func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { b.Lock() defer b.Unlock() @@ -177,6 +160,7 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) { b.resetWalFile() } else { err := b.file.TruncateFront(b.batchFirst + uint64(len(batch))) + b.file.ClearCache() if err != nil { panic(err) } @@ -208,9 +192,10 @@ func (b *DiskBuffer) resetBatch() { b.batchSize = 0 } -// todo This is very messy and not ideal, but serves as the only way I can find currently -// todo to actually clear the walfile completely if needed, since Truncate() calls require -// todo at least one entry remains in them otherwise they return an error. +// This is very messy and not ideal, but serves as the only way I can find currently +// to actually clear the walfile completely if needed, since Truncate() calls require +// that at least one entry remains in them otherwise they return an error. +// Related issue: https://github.com/tidwall/wal/issues/20 func (b *DiskBuffer) resetWalFile() { b.file.Close() os.Remove(b.path) diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go index a8eda7dfc7e08..d650471ad249d 100644 --- a/models/buffer_disk_test.go +++ b/models/buffer_disk_test.go @@ -1,16 +1,17 @@ package models import ( - "fmt" - "io" "os" "path/filepath" "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tidwall/wal" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func newTestDiskBuffer(t testing.TB) Buffer { @@ -41,55 +42,69 @@ func TestBuffer_RetainsTrackingInformation(t *testing.T) { require.Equal(t, 1, delivered) } -// WAL file tested here was written as: -// 1: Metric() -// 2: Metric() -// 3: Metric() -// 4: metric.WithTracking(Metric()) -// 5: Metric() -// -// Expected to drop the 4th metric, as tracking metrics from -// previous instances are dropped when the wal file is reopened. func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) { - // copy the testdata so we do not destroy the testdata wal file path, err := os.MkdirTemp("", "*-buffer-test") require.NoError(t, err) - f, err := os.Create(path + "/00000000000000000001") - require.NoError(t, err) - f1, err := os.Open("testdata/testwal/00000000000000000001") - require.NoError(t, err) - written, err := io.Copy(f, f1) + walfile, err := wal.Open(path, nil) require.NoError(t, err) - fmt.Println(written) + + tm, _ := metric.WithTracking(Metric(), func(_ telegraf.DeliveryInfo) {}) + + metrics := []telegraf.Metric{ + // Basic metric with 1 field, 0 timestamp + Metric(), + // Basic metric with 1 field, different timestamp + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 20.0, + }, + time.Now(), + ), + // Metric with a field + metric.New( + "cpu", + map[string]string{ + "x": "y", + }, + map[string]interface{}{ + "value": 18.0, + }, + time.Now(), + ), + // Tracking metric + tm, + // Metric with lots of tag types + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value_f64": 20.0, + "value_uint64": uint64(10), + "value_int16": int16(5), + "value_string": "foo", + "value_boolean": true, + "value_byte_array": []byte{1, 2, 3, 4, 5}, + }, + time.Now(), + ), + } + + // call manually so that we can properly use metric.ToBytes() without having initialized a buffer + registerGob() + + for i, m := range metrics { + data, err := metric.ToBytes(m) + require.NoError(t, err) + require.NoError(t, walfile.Write(uint64(i+1), data)) + } b := newTestDiskBufferWithPath(t, filepath.Base(path), filepath.Dir(path)) batch := b.Batch(4) + // expected skips the tracking metric expected := []telegraf.Metric{ - Metric(), Metric(), Metric(), Metric(), + metrics[0], metrics[1], metrics[2], metrics[4], } testutil.RequireMetricsEqual(t, expected, batch) } - -/* -// Function used to create the test data used in the test above -func Test_CreateTestData(t *testing.T) { - metric.Init() - walfile, _ := wal.Open("testdata/testwal", nil) - data, err := metric.ToBytes(Metric()) - require.NoError(t, err) - require.NoError(t, walfile.Write(1, data)) - data, err = metric.ToBytes(Metric()) - require.NoError(t, err) - require.NoError(t, walfile.Write(2, data)) - data, err = metric.ToBytes(Metric()) - require.NoError(t, err) - require.NoError(t, walfile.Write(3, data)) - m, _ := metric.WithTracking(Metric(), func(di telegraf.DeliveryInfo) {}) - data, err = metric.ToBytes(m) - require.NoError(t, err) - require.NoError(t, walfile.Write(4, data)) - data, err = metric.ToBytes(Metric()) - require.NoError(t, err) - require.NoError(t, walfile.Write(5, data)) -} -*/ diff --git a/models/testdata/testwal/00000000000000000001 b/models/testdata/testwal/00000000000000000001 deleted file mode 100644 index a4d72e966edfb..0000000000000 Binary files a/models/testdata/testwal/00000000000000000001 and /dev/null differ