Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ func (b *builtIn) Reconfigure(ctx context.Context, deps resource.Dependencies, c
// These Reconfigure calls are the only methods in builtin.Reconfigure which create / destroy resources.
// It is important that no errors happen for a given Reconfigure call after we being callin Reconfigure on capture & sync
// or we could leak goroutines, wasting resources and causing bugs due to duplicate work.
b.diskSummaryTracker.reconfigure(syncConfig.SyncPaths())
shouldSync := func(ctx context.Context) bool {
return syncConfig.SchedulerEnabled() && datasync.ReadyToSyncDirectories(ctx, syncConfig, b.logger)
}
b.diskSummaryTracker.reconfigure(syncConfig.SyncPaths(), syncConfig.SyncIntervalMins, shouldSync)
b.capture.Reconfigure(ctx, collectorConfigsByResource, captureConfig)
b.sync.Reconfigure(ctx, syncConfig, cloudConnSvc)

Expand Down
63 changes: 61 additions & 2 deletions services/datamanager/builtin/disk_summary_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

goutils "go.viam.com/utils"

"go.viam.com/rdk/data"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/utils/diskusage"
)
Expand All @@ -18,6 +19,11 @@ type diskSummaryTracker struct {
logger logging.Logger
mu sync.Mutex
worker *goutils.StoppableWorkers

// Sync config fields used for stale data warnings.
syncIntervalMins float64
shouldSync func(context.Context) bool
lastStaleWarning time.Time
}

type diskSummary struct {
Expand All @@ -37,19 +43,29 @@ type syncPathsSummary struct {
TotalSizeBytes int64
}

const diskSummaryTrackerInterval = 1 * time.Minute
const (
diskSummaryTrackerInterval = 1 * time.Minute
// minStaleThreshold is the minimum age of the oldest file before we consider data stale.
minStaleThreshold = 3 * time.Minute
// staleWarningInterval is the minimum time between consecutive stale data warnings.
staleWarningInterval = 5 * time.Minute
)

func newDiskSummaryTracker(logger logging.Logger) *diskSummaryTracker {
return &diskSummaryTracker{
logger: logger,
}
}

func (poller *diskSummaryTracker) reconfigure(dirs []string) {
func (poller *diskSummaryTracker) reconfigure(dirs []string, syncIntervalMins float64, shouldSync func(context.Context) bool) {
if poller.worker != nil {
poller.worker.Stop()
}

poller.syncIntervalMins = syncIntervalMins
poller.shouldSync = shouldSync
poller.lastStaleWarning = time.Time{}

poller.logger.Debug("datamanager disk state summary tracker running...")
// Calculate and set the initial summary.
poller.calculateAndSetSummary(context.Background(), dirs)
Expand Down Expand Up @@ -117,9 +133,52 @@ func (poller *diskSummaryTracker) calculateAndSetSummary(ctx context.Context, di
diskSummary.SyncPaths.TotalSizeBytes = totalBytes
diskSummary.OldestCaptureFileTime = earliestTime

poller.checkAndLogStaleData(ctx, earliestTime, totalFiles, totalBytes)
poller.setSummary(diskSummary)
}

// checkAndLogStaleData logs a rate-limited message if the oldest file in the capture directory
// is significantly older than expected given the sync interval. Logs at WARN if sync should be
// actively happening (scheduler enabled and sync sensor allows it), or at DEBUG if sync is
// paused (e.g. selective sync sensor returned false).
func (poller *diskSummaryTracker) checkAndLogStaleData(ctx context.Context, earliestTime *time.Time, totalFiles, totalBytes int64) {
if earliestTime == nil || poller.shouldSync == nil {
return
}

staleThreshold := time.Duration(10 * poller.syncIntervalMins * float64(time.Minute))
if staleThreshold < minStaleThreshold {
staleThreshold = minStaleThreshold
}

age := time.Since(*earliestTime)
if age <= staleThreshold {
return
}

now := time.Now()
if !poller.lastStaleWarning.IsZero() && now.Sub(poller.lastStaleWarning) < staleWarningInterval {
return
}
poller.lastStaleWarning = now

msg := "Capture data may not be syncing: oldest file is %s old, expected less than %s. " +
"There are %d files (%s) waiting to sync. " +
"Data may be generating faster than it can be uploaded, or uploads may be failing."

if poller.shouldSync(ctx) {
poller.logger.Warnf(msg,
age.Round(time.Second), staleThreshold.Round(time.Second),
totalFiles, data.FormatBytesI64(totalBytes),
)
} else {
poller.logger.Debugf(msg,
age.Round(time.Second), staleThreshold.Round(time.Second),
totalFiles, data.FormatBytesI64(totalBytes),
)
}
}

func (poller *diskSummaryTracker) getSummary() diskSummary {
poller.mu.Lock()
defer poller.mu.Unlock()
Expand Down
161 changes: 161 additions & 0 deletions services/datamanager/builtin/disk_summary_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package builtin

import (
"context"
"testing"
"time"

"go.viam.com/test"

"go.viam.com/rdk/logging"
)

func alwaysSync(_ context.Context) bool { return true }
func neverSync(_ context.Context) bool { return false }

func TestCheckAndLogStaleData(t *testing.T) {
ctx := context.Background()

t.Run("no warning when earliestTime is nil", func(t *testing.T) {
logger, logs := logging.NewObservedTestLogger(t)
tracker := &diskSummaryTracker{
logger: logger,
shouldSync: alwaysSync,
syncIntervalMins: 0.1,
}
tracker.checkAndLogStaleData(ctx, nil, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 0)
})

t.Run("no warning when shouldSync is nil", func(t *testing.T) {
logger, logs := logging.NewObservedTestLogger(t)
tracker := &diskSummaryTracker{
logger: logger,
syncIntervalMins: 0.1,
}
oldTime := time.Now().Add(-1 * time.Hour)
tracker.checkAndLogStaleData(ctx, &oldTime, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 0)
})

t.Run("no warning when data is fresh", func(t *testing.T) {
logger, logs := logging.NewObservedTestLogger(t)
tracker := &diskSummaryTracker{
logger: logger,
shouldSync: alwaysSync,
syncIntervalMins: 0.1,
}
recentTime := time.Now().Add(-30 * time.Second)
tracker.checkAndLogStaleData(ctx, &recentTime, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 0)
})

t.Run("WARN when data is stale and shouldSync returns true", func(t *testing.T) {
logger, logs := logging.NewObservedTestLogger(t)
tracker := &diskSummaryTracker{
logger: logger,
shouldSync: alwaysSync,
syncIntervalMins: 0.1, // threshold = max(60s, 3min) = 3min
}
staleTime := time.Now().Add(-5 * time.Minute)
tracker.checkAndLogStaleData(ctx, &staleTime, 42, 5*1024*1024)
warnLogs := logs.FilterMessageSnippet("Capture data may not be syncing")
test.That(t, warnLogs.Len(), test.ShouldEqual, 1)
test.That(t, warnLogs.All()[0].Level.String(), test.ShouldEqual, "warn")
test.That(t, warnLogs.All()[0].Message, test.ShouldContainSubstring, "42 files")
})

t.Run("DEBUG when data is stale and shouldSync returns false", func(t *testing.T) {
logger, logs := logging.NewObservedTestLogger(t)
tracker := &diskSummaryTracker{
logger: logger,
shouldSync: neverSync,
syncIntervalMins: 0.1,
}
staleTime := time.Now().Add(-5 * time.Minute)
tracker.checkAndLogStaleData(ctx, &staleTime, 10, 1024)
staleLogs := logs.FilterMessageSnippet("Capture data may not be syncing")
test.That(t, staleLogs.Len(), test.ShouldEqual, 1)
test.That(t, staleLogs.All()[0].Level.String(), test.ShouldEqual, "debug")
})

t.Run("warning respects stale threshold based on sync interval", func(t *testing.T) {
logger, logs := logging.NewObservedTestLogger(t)
// With 2 min sync interval, threshold = 10 * 2 = 20 minutes.
tracker := &diskSummaryTracker{
logger: logger,
shouldSync: alwaysSync,
syncIntervalMins: 2.0,
}

// 15 min old - under 20 min threshold, no warning.
justUnder := time.Now().Add(-15 * time.Minute)
tracker.checkAndLogStaleData(ctx, &justUnder, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 0)

// 25 min old - over 20 min threshold, warning.
over := time.Now().Add(-25 * time.Minute)
tracker.checkAndLogStaleData(ctx, &over, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 1)
})

t.Run("warning is rate-limited", func(t *testing.T) {
logger, logs := logging.NewObservedTestLogger(t)
tracker := &diskSummaryTracker{
logger: logger,
shouldSync: alwaysSync,
syncIntervalMins: 0.1,
}
staleTime := time.Now().Add(-5 * time.Minute)

// First call should log.
tracker.checkAndLogStaleData(ctx, &staleTime, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 1)

// Second call immediately after should be rate-limited.
tracker.checkAndLogStaleData(ctx, &staleTime, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 1)
})

t.Run("warning fires again after rate limit expires", func(t *testing.T) {
logger, logs := logging.NewObservedTestLogger(t)
tracker := &diskSummaryTracker{
logger: logger,
shouldSync: alwaysSync,
syncIntervalMins: 0.1,
}
staleTime := time.Now().Add(-5 * time.Minute)

// First call should log.
tracker.checkAndLogStaleData(ctx, &staleTime, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 1)

// Simulate that staleWarningInterval has passed.
tracker.lastStaleWarning = time.Now().Add(-staleWarningInterval - time.Second)

// Should log again.
tracker.checkAndLogStaleData(ctx, &staleTime, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 2)
})

t.Run("minStaleThreshold applies for short sync intervals", func(t *testing.T) {
logger, logs := logging.NewObservedTestLogger(t)
// With 0.1 min (6s) sync interval, 10 * 0.1 = 1 min.
// But minStaleThreshold is 3 min, so threshold should be 3 min.
tracker := &diskSummaryTracker{
logger: logger,
shouldSync: alwaysSync,
syncIntervalMins: 0.1,
}

// 2 min old - over 1 min but under 3 min minimum, no warning.
twoMinOld := time.Now().Add(-2 * time.Minute)
tracker.checkAndLogStaleData(ctx, &twoMinOld, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 0)

// 4 min old - over 3 min minimum, warning.
fourMinOld := time.Now().Add(-4 * time.Minute)
tracker.checkAndLogStaleData(ctx, &fourMinOld, 10, 1024)
test.That(t, logs.FilterMessageSnippet("Capture data may not be syncing").Len(), test.ShouldEqual, 1)
})
}
5 changes: 4 additions & 1 deletion services/datamanager/builtin/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ type Config struct {
SelectiveSyncSensor sensor.Sensor
}

func (c Config) schedulerEnabled() bool {
// SchedulerEnabled returns true if the sync scheduler should be running.
// It is false when sync is explicitly disabled, or when a selective sync sensor
// is configured but could not be found.
func (c Config) SchedulerEnabled() bool {
configDisabled := c.ScheduledSyncDisabled
selectiveSyncerInvalid := c.SelectiveSyncSensorEnabled && c.SelectiveSyncSensor == nil
return !configDisabled && !selectiveSyncerInvalid
Expand Down
16 changes: 8 additions & 8 deletions services/datamanager/builtin/sync/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,28 +211,28 @@ func TestConfig(t *testing.T) {
}
})

t.Run("schedulerEnabled()", func(t *testing.T) {
t.Run("SchedulerEnabled()", func(t *testing.T) {
t.Run("true by default", func(t *testing.T) {
test.That(t, Config{}.schedulerEnabled(), test.ShouldBeTrue)
test.That(t, Config{}.SchedulerEnabled(), test.ShouldBeTrue)
})

t.Run("false if ScheduledSyncDisabled", func(t *testing.T) {
test.That(t, Config{ScheduledSyncDisabled: true}.schedulerEnabled(), test.ShouldBeFalse)
test.That(t, Config{ScheduledSyncDisabled: true, SyncIntervalMins: 1.0}.schedulerEnabled(), test.ShouldBeFalse)
test.That(t, Config{ScheduledSyncDisabled: true}.SchedulerEnabled(), test.ShouldBeFalse)
test.That(t, Config{ScheduledSyncDisabled: true, SyncIntervalMins: 1.0}.SchedulerEnabled(), test.ShouldBeFalse)
})

t.Run("false if SelectiveSyncSensorEnabled is true and SelectiveSyncSensor is nil", func(t *testing.T) {
test.That(t, Config{SelectiveSyncSensorEnabled: true}.schedulerEnabled(), test.ShouldBeFalse)
test.That(t, Config{SelectiveSyncSensorEnabled: true, SyncIntervalMins: 1.0}.schedulerEnabled(), test.ShouldBeFalse)
test.That(t, Config{SelectiveSyncSensorEnabled: true}.SchedulerEnabled(), test.ShouldBeFalse)
test.That(t, Config{SelectiveSyncSensorEnabled: true, SyncIntervalMins: 1.0}.SchedulerEnabled(), test.ShouldBeFalse)
})

t.Run("true otherwise", func(t *testing.T) {
test.That(t, Config{SyncIntervalMins: 1.0}.schedulerEnabled(), test.ShouldBeTrue)
test.That(t, Config{SyncIntervalMins: 1.0}.SchedulerEnabled(), test.ShouldBeTrue)
test.That(t, Config{
SyncIntervalMins: 1.0,
SelectiveSyncSensorEnabled: true,
SelectiveSyncSensor: &inject.Sensor{},
}.schedulerEnabled(), test.ShouldBeTrue)
}.SchedulerEnabled(), test.ShouldBeTrue)
})
})

Expand Down
12 changes: 6 additions & 6 deletions services/datamanager/builtin/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (s *Sync) Reconfigure(_ context.Context, config Config, cloudConnSvc cloud.
// config changed... stop workers
s.config.logDiff(config, s.logger)

if s.config.schedulerEnabled() && !s.config.Equal(Config{}) {
if s.config.SchedulerEnabled() && !s.config.Equal(Config{}) {
// only log if the pool was previously started
s.logger.Info("stopping sync worker pool")
}
Expand All @@ -193,7 +193,7 @@ func (s *Sync) Reconfigure(_ context.Context, config Config, cloudConnSvc cloud.

// start workers
s.startWorkers(config)
if config.schedulerEnabled() {
if config.SchedulerEnabled() {
// time.Duration loses precision at low floating point values, so turn intervalMins to milliseconds.
intervalMillis := 60000.0 * config.SyncIntervalMins
// The ticker must be created before uploadData returns to prevent race conditions between clock.Ticker and
Expand Down Expand Up @@ -646,7 +646,7 @@ func (s *Sync) runScheduler(ctx context.Context, tkr *clock.Ticker, config Confi
case <-ctx.Done():
return
case <-tkr.C:
shouldSync := readyToSyncDirectories(ctx, config, s.logger)
shouldSync := ReadyToSyncDirectories(ctx, config, s.logger)
state := s.cloudConn.conn.GetState()
online := state == connectivity.Ready
if !online {
Expand Down Expand Up @@ -758,9 +758,9 @@ func (s *Sync) sendToSync(ctx context.Context, path string) {
}
}

// readyToSyncDirectories is a method for getting the bool reading from the selective sync sensor
// for determining whether the key is present and what its value is.
func readyToSyncDirectories(ctx context.Context, config Config, logger logging.Logger) bool {
// ReadyToSyncDirectories checks the selective sync sensor to determine if we should sync.
// Returns true if no selective sync sensor is configured, or if the sensor indicates syncing should occur.
func ReadyToSyncDirectories(ctx context.Context, config Config, logger logging.Logger) bool {
// If selective sync is disabled, sync. If it is enabled, check the condition below.
if !config.SelectiveSyncSensorEnabled {
return true
Expand Down
Loading