diff --git a/internal/pkg/ntp/internal/spike/spike.go b/internal/pkg/ntp/internal/spike/spike.go new file mode 100644 index 0000000000..1347df45a8 --- /dev/null +++ b/internal/pkg/ntp/internal/spike/spike.go @@ -0,0 +1,109 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package spike provides a spike detector for NTP responses. +package spike + +import ( + "math" + + "github.com/beevik/ntp" +) + +const defaultCapacity = 8 + +// Sample is a single NTP response sample. +type Sample struct { + Offset, RTT float64 // in seconds +} + +// SampleFromNTPResponse converts an NTP response to a Sample. +func SampleFromNTPResponse(resp *ntp.Response) Sample { + return Sample{ + Offset: resp.ClockOffset.Seconds(), + RTT: resp.RTT.Seconds(), + } +} + +// Detector detects spikes in NTP response samples. +// +// Zero value is ready to use. +type Detector struct { + packetCount int64 + samples []Sample + samplesIdx int + samplesJitter float64 +} + +// IsSpike returns true if the given sample is a spike. +// +// The sample is added to the detector's internal state. +func (d *Detector) IsSpike(sample Sample) bool { + if d.samples == nil { + d.samples = make([]Sample, defaultCapacity) + } + + d.packetCount++ + + if d.packetCount == 1 { + // ignore first packet + return false + } + + var currentIndex int + + currentIndex, d.samplesIdx = d.samplesIdx, (d.samplesIdx+1)%len(d.samples) + + d.samples[d.samplesIdx] = sample + + jitter := d.samplesJitter + + indexMin := currentIndex + + for i := range d.samples { + if d.samples[i].RTT == 0 { + continue + } + + if d.samples[i].RTT < d.samples[indexMin].RTT { + indexMin = i + } + } + + var j float64 + + for i := range d.samples { + j += math.Pow(d.samples[i].Offset-d.samples[indexMin].Offset, 2) + } + + d.samplesJitter = math.Sqrt(j / (float64(len(d.samples)) - 1)) + + if math.Abs(sample.Offset) > sample.RTT { + // always accept clock offset if that is larger than rtt + return false + } + + if d.packetCount < 4 { + // need more samples to make a decision + return false + } + + // This check was specifically removed (while it exists in systemd-timesync), + // as I don't understand why it's needed (@smira). + // It seems to give false positives when the RTT and Offset are close to each other, + // e.g. when NTP server is on the same LAN. + // + // if math.Abs(sample.Offset) > d.samples[indexMin].RTT { + // // do not accept anything worse than the maximum possible error of the best sample + // return true + // } + + // check that diff to the last offset is not more than 3*(observed jitter) + return math.Abs(sample.Offset-d.samples[currentIndex].Offset) > 3*jitter +} + +// Jitter returns the current jitter. +func (d *Detector) Jitter() float64 { + return d.samplesJitter +} diff --git a/internal/pkg/ntp/internal/spike/spike_test.go b/internal/pkg/ntp/internal/spike/spike_test.go new file mode 100644 index 0000000000..878d967ab5 --- /dev/null +++ b/internal/pkg/ntp/internal/spike/spike_test.go @@ -0,0 +1,135 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package spike_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/siderolabs/talos/internal/pkg/ntp/internal/spike" +) + +func TestSpikeDetector(t *testing.T) { + for _, test := range []struct { + name string + samples []spike.Sample + + expectedSpikes []bool + }{ + { + name: "no spikes", + + samples: []spike.Sample{ + {Offset: 0.01, RTT: 0.01}, + {Offset: 0.05, RTT: 0.01}, + {Offset: 0.03, RTT: 0.01}, + {Offset: 0.01, RTT: 0.01}, + {Offset: -0.01, RTT: 0.01}, + {Offset: -0.02, RTT: 0.03}, // not a spike, just a large RTT + }, + + expectedSpikes: []bool{ + false, + false, + false, + false, + false, + false, + }, + }, + { + name: "offset spike", + + samples: []spike.Sample{ + {Offset: 0.01, RTT: 0.01}, + {Offset: 0.05, RTT: 0.01}, + {Offset: 0.03, RTT: 0.01}, + {Offset: 0.01, RTT: 0.01}, + {Offset: 0.01, RTT: 0.01}, + {Offset: 0.01, RTT: 0.01}, + {Offset: -0.01, RTT: 0.01}, + {Offset: -0.5, RTT: 0.7}, // spike + }, + + expectedSpikes: []bool{ + false, + false, + false, + false, + false, + false, + false, + true, + }, + }, + { + name: "adjusting to higher RTT", + + samples: []spike.Sample{ + {Offset: 0.01, RTT: 0.01}, + {Offset: 0.05, RTT: 0.01}, + {Offset: 0.03, RTT: 0.01}, + {Offset: 0.01, RTT: 0.01}, + {Offset: -0.01, RTT: 0.01}, + {Offset: 0.01, RTT: 0.01}, + {Offset: -0.01, RTT: 0.01}, + {Offset: -0.5, RTT: 0.7}, // spike + {Offset: 0.5, RTT: 0.7}, // spike + {Offset: -0.5, RTT: 0.7}, // spike + {Offset: 0.5, RTT: 0.7}, // not a spike anymore, filter adjusted itself + {Offset: -0.5, RTT: 0.7}, + {Offset: 0.01, RTT: 0.01}, + }, + + expectedSpikes: []bool{ + false, + false, + false, + false, + false, + false, + false, + true, + true, + true, + false, + false, + false, + }, + }, + { + name: "initial ignore", + + samples: []spike.Sample{ + {Offset: 5, RTT: 0.01}, // initial packet is ignored completely + {Offset: 0.05, RTT: 0.05}, + {Offset: 0.5, RTT: 0.5}, // spike detection kicks in after 4 packets + {Offset: 0.01, RTT: 0.01}, + {Offset: -0.01, RTT: 0.01}, + {Offset: 0.01, RTT: 0.01}, + }, + + expectedSpikes: []bool{ + false, + false, + false, + false, + false, + false, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + var detector spike.Detector + + for i, sample := range test.samples { + isSpike := detector.IsSpike(sample) + + assert.Equal(t, test.expectedSpikes[i], isSpike, "unexpected spike: %v (position %d)", test.expectedSpikes[i], i) + } + }) + } +} diff --git a/internal/pkg/ntp/ntp.go b/internal/pkg/ntp/ntp.go index 617de92e31..c115d5bd15 100644 --- a/internal/pkg/ntp/ntp.go +++ b/internal/pkg/ntp/ntp.go @@ -9,7 +9,6 @@ import ( "bytes" "context" "fmt" - "math" "math/bits" "net" "reflect" @@ -23,6 +22,7 @@ import ( "go.uber.org/zap/zapcore" "golang.org/x/sys/unix" + "github.com/siderolabs/talos/internal/pkg/ntp/internal/spike" "github.com/siderolabs/talos/internal/pkg/timex" ) @@ -42,10 +42,7 @@ type Syncer struct { firstSync bool - packetCount int64 - samples []sample - samplesIdx int - samplesJitter float64 + spikeDetector spike.Detector MinPoll, MaxPoll, RetryPoll time.Duration @@ -55,12 +52,6 @@ type Syncer struct { AdjustTime AdjustTimeFunc } -const sampleCount = 8 - -type sample struct { - offset, rtt float64 // in seconds -} - // NewSyncer creates new Syncer with default configuration. func NewSyncer(logger *zap.Logger, timeServers []string) *Syncer { syncer := &Syncer{ @@ -74,7 +65,7 @@ func NewSyncer(logger *zap.Logger, timeServers []string) *Syncer { firstSync: true, - samples: make([]sample, sampleCount), + spikeDetector: spike.Detector{}, MinPoll: MinAllowablePoll, MaxPoll: MaxAllowablePoll, @@ -149,59 +140,8 @@ func absDuration(d time.Duration) time.Duration { return d } -func (syncer *Syncer) spikeDetector(resp *ntp.Response) bool { - syncer.packetCount++ - - if syncer.packetCount == 1 { - // ignore first packet - return false - } - - var currentIndex int - - currentIndex, syncer.samplesIdx = syncer.samplesIdx, (syncer.samplesIdx+1)%sampleCount - - syncer.samples[syncer.samplesIdx].offset = resp.ClockOffset.Seconds() - syncer.samples[syncer.samplesIdx].rtt = resp.RTT.Seconds() - - jitter := syncer.samplesJitter - - indexMin := currentIndex - - for i := range syncer.samples { - if syncer.samples[i].rtt == 0 { - continue - } - - if syncer.samples[i].rtt < syncer.samples[indexMin].rtt { - indexMin = i - } - } - - var j float64 - - for i := range syncer.samples { - j += math.Pow(syncer.samples[i].offset-syncer.samples[indexMin].offset, 2) - } - - syncer.samplesJitter = math.Sqrt(j / (sampleCount - 1)) - - if absDuration(resp.ClockOffset) > resp.RTT { - // always accept clock offset if that is larger than rtt - return false - } - - if syncer.packetCount < 4 { - // need more samples to make a decision - return false - } - - if absDuration(resp.ClockOffset).Seconds() > syncer.samples[indexMin].rtt { - // do not accept anything worse than the maximum possible error of the best sample - return true - } - - return math.Abs(resp.ClockOffset.Seconds()-syncer.samples[currentIndex].offset) > 3*jitter +func (syncer *Syncer) isSpike(resp *ntp.Response) bool { + return syncer.spikeDetector.IsSpike(spike.SampleFromNTPResponse(resp)) } // Run runs the sync process. @@ -231,7 +171,7 @@ func (syncer *Syncer) Run(ctx context.Context) { spike := false if resp != nil && resp.Validate() == nil { - spike = syncer.spikeDetector(resp) + spike = syncer.isSpike(resp) } switch { @@ -264,7 +204,7 @@ func (syncer *Syncer) Run(ctx context.Context) { } syncer.logger.Debug("sample stats", - zap.Duration("jitter", time.Duration(syncer.samplesJitter*float64(time.Second))), + zap.Duration("jitter", time.Duration(syncer.spikeDetector.Jitter()*float64(time.Second))), zap.Duration("poll_interval", pollInterval), zap.Bool("spike", spike), )